提交 1aac8841 编写于 作者: S shenglian zhou

Merge remote-tracking branch 'origin/develop' into szhou/feature/support-math-functions

......@@ -456,23 +456,42 @@ pipeline {
nohup taosd >/dev/null &
sleep 10
'''
sh '''
cd ${WKC}/tests/examples/nodejs
npm install td2.0-connector > /dev/null 2>&1
node nodejsChecker.js host=localhost
node test1970.js
cd ${WKC}/tests/connectorTest/nodejsTest/nanosupport
npm install td2.0-connector > /dev/null 2>&1
node nanosecondTest.js
cd ${WKC}/src/connector/python
export PYTHONPATH=$PWD/
export LD_LIBRARY_PATH=${WKC}/debug/build/lib
pip3 install pytest
pytest tests/
python3 examples/bind-multi.py
python3 examples/bind-row.py
python3 examples/demo.py
python3 examples/insert-lines.py
python3 examples/pep-249.py
python3 examples/query-async.py
python3 examples/query-objectively.py
python3 examples/subscribe-sync.py
python3 examples/subscribe-async.py
'''
sh '''
cd ${WKC}/tests/examples/nodejs
npm install td2.0-connector > /dev/null 2>&1
node nodejsChecker.js host=localhost
node test1970.js
cd ${WKC}/tests/connectorTest/nodejsTest/nanosupport
npm install td2.0-connector > /dev/null 2>&1
node nanosecondTest.js
'''
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/examples/C#/taosdemo
mcs -out:taosdemo *.cs > /dev/null 2>&1
echo '' |./taosdemo -c /etc/taos
dotnet build -c Release
tree | true
./bin/Release/net5.0/taosdemo -c /etc/taos -y
'''
}
}
sh '''
cd ${WKC}/tests/gotest
bash batchtest.sh
......
[Unit]
Description=TDengine server service
After=network-online.target
Wants=network-online.target
After=network-online.target taosadapter.service
Wants=network-online.target taosadapter.service
[Service]
Type=simple
......
......@@ -3,7 +3,7 @@
# Generate the deb package for ubuntu, or rpm package for centos, or tar.gz package for other linux os
set -e
#set -x
set -x
# release.sh -v [cluster | edge]
# -c [aarch32 | aarch64 | x64 | x86 | mips64 ...]
......@@ -414,10 +414,14 @@ fi
if [[ "$httpdBuild" == "true" ]]; then
BUILD_HTTP=true
BUILD_TOOLS=false
else
BUILD_HTTP=false
fi
if [[ "$pagMode" == "full" ]]; then
BUILD_TOOLS=true
else
BUILD_TOOLS=false
fi
# check support cpu type
......@@ -514,15 +518,17 @@ if [ "$osType" != "Darwin" ]; then
cd ${script_dir}/deb
${csudo} ./makedeb.sh ${compile_dir} ${output_dir} ${verNumber} ${cpuType} ${osType} ${verMode} ${verType}
if [ -d ${top_dir}/src/kit/taos-tools/packaging/deb ]; then
cd ${top_dir}/src/kit/taos-tools/packaging/deb
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
if [[ "$pagMode" == "full" ]]; then
if [ -d ${top_dir}/src/kit/taos-tools/packaging/deb ]; then
cd ${top_dir}/src/kit/taos-tools/packaging/deb
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
taos_tools_ver=$(git describe --tags|sed -e 's/ver-//g'|awk -F '-' '{print $1}')
${csudo} ./make-taos-tools-deb.sh ${top_dir} \
${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType}
taos_tools_ver=$(git describe --tags|sed -e 's/ver-//g'|awk -F '-' '{print $1}')
${csudo} ./make-taos-tools-deb.sh ${top_dir} \
${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType}
fi
fi
else
else
echo "==========dpkg command not exist, so not release deb package!!!"
fi
ret='0'
......@@ -537,15 +543,17 @@ if [ "$osType" != "Darwin" ]; then
cd ${script_dir}/rpm
${csudo} ./makerpm.sh ${compile_dir} ${output_dir} ${verNumber} ${cpuType} ${osType} ${verMode} ${verType}
if [ -d ${top_dir}/src/kit/taos-tools/packaging/rpm ]; then
cd ${top_dir}/src/kit/taos-tools/packaging/rpm
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
if [[ "$pagMode" == "full" ]]; then
if [ -d ${top_dir}/src/kit/taos-tools/packaging/rpm ]; then
cd ${top_dir}/src/kit/taos-tools/packaging/rpm
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
taos_tools_ver=$(git describe --tags|sed -e 's/ver-//g'|awk -F '-' '{print $1}'|sed -e 's/-/_/g')
${csudo} ./make-taos-tools-rpm.sh ${top_dir} \
${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType}
taos_tools_ver=$(git describe --tags|sed -e 's/ver-//g'|awk -F '-' '{print $1}'|sed -e 's/-/_/g')
${csudo} ./make-taos-tools-rpm.sh ${top_dir} \
${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType}
fi
fi
else
else
echo "==========rpmbuild command not exist, so not release rpm package!!!"
fi
fi
......
......@@ -188,9 +188,6 @@ function update() {
install_log
install_header
install_lib
if [ "$pagMode" != "lite" ]; then
install_connector
fi
install_examples
install_bin
install_config
......@@ -215,9 +212,6 @@ function install() {
install_log
install_header
install_lib
if [ "$pagMode" != "lite" ]; then
install_connector
fi
install_examples
install_bin
install_config
......
......@@ -189,9 +189,6 @@ function update() {
install_log
install_header
install_lib
if [ "$pagMode" != "lite" ]; then
install_connector
fi
install_examples
install_bin
install_config
......@@ -216,9 +213,6 @@ function install() {
install_log
install_header
install_lib
if [ "$pagMode" != "lite" ]; then
install_connector
fi
install_examples
install_bin
install_config
......
......@@ -247,9 +247,6 @@ function update_PowerDB() {
install_log
install_header
install_lib
if [ "$pagMode" != "lite" ]; then
install_connector
fi
install_examples
install_bin
install_config
......@@ -275,9 +272,6 @@ function install_PowerDB() {
install_header
install_lib
install_jemalloc
if [ "$pagMode" != "lite" ]; then
install_connector
fi
install_examples
install_bin
install_config
......
......@@ -189,9 +189,6 @@ function update_prodb() {
install_log
install_header
install_lib
if [ "$pagMode" != "lite" ]; then
install_connector
fi
install_examples
install_bin
install_config
......@@ -216,9 +213,6 @@ function install_prodb() {
install_log
install_header
install_lib
if [ "$pagMode" != "lite" ]; then
install_connector
fi
install_examples
install_bin
install_config
......
......@@ -193,9 +193,6 @@ function update_tq() {
install_log
install_header
install_lib
if [ "$pagMode" != "lite" ]; then
install_connector
fi
install_examples
install_bin
install_config
......@@ -220,9 +217,6 @@ function install_tq() {
install_log
install_header
install_lib
if [ "$pagMode" != "lite" ]; then
install_connector
fi
install_examples
install_bin
install_config
......
......@@ -69,10 +69,10 @@ if [ "$osType" != "Darwin" ]; then
if [ "$pagMode" == "lite" ]; then
strip ${build_dir}/bin/taos
cp ${build_dir}/bin/taos ${install_dir}/bin/jh_taos
cp ${script_dir}/remove_jh.sh ${install_dir}/bin
cp ${script_dir}/remove_client_jh.sh ${install_dir}/bin
else
cp ${build_dir}/bin/taos ${install_dir}/bin/jh_taos
cp ${script_dir}/remove_jh.sh ${install_dir}/bin
cp ${script_dir}/remove_client_jh.sh ${install_dir}/bin
cp ${build_dir}/bin/taosdemo ${install_dir}/bin/jhdemo
cp ${build_dir}/bin/taosdump ${install_dir}/bin/jh_taosdump
cp ${script_dir}/set_core.sh ${install_dir}/bin
......
......@@ -69,10 +69,10 @@ if [ "$osType" != "Darwin" ]; then
if [ "$pagMode" == "lite" ]; then
strip ${build_dir}/bin/taos
cp ${build_dir}/bin/taos ${install_dir}/bin/khclient
cp ${script_dir}/remove_kh.sh ${install_dir}/bin
cp ${script_dir}/remove_client_kh.sh ${install_dir}/bin
else
cp ${build_dir}/bin/taos ${install_dir}/bin/khclient
cp ${script_dir}/remove_kh.sh ${install_dir}/bin
cp ${script_dir}/remove_client_kh.sh ${install_dir}/bin
cp ${build_dir}/bin/taosdemo ${install_dir}/bin/khdemo
cp ${build_dir}/bin/taosdump ${install_dir}/bin/khdump
cp ${script_dir}/set_core.sh ${install_dir}/bin
......
......@@ -109,10 +109,10 @@ if [ "$osType" != "Darwin" ]; then
if [ "$pagMode" == "lite" ]; then
strip ${build_dir}/bin/taos
cp ${build_dir}/bin/taos ${install_dir}/bin/power
cp ${script_dir}/remove_power.sh ${install_dir}/bin
cp ${script_dir}/remove_client_power.sh ${install_dir}/bin
else
cp ${build_dir}/bin/taos ${install_dir}/bin/power
cp ${script_dir}/remove_power.sh ${install_dir}/bin
cp ${script_dir}/remove_client_power.sh ${install_dir}/bin
cp ${build_dir}/bin/taosdemo ${install_dir}/bin/powerdemo
cp ${build_dir}/bin/taosdump ${install_dir}/bin/powerdump
cp ${script_dir}/set_core.sh ${install_dir}/bin
......
......@@ -69,10 +69,10 @@ if [ "$osType" != "Darwin" ]; then
if [ "$pagMode" == "lite" ]; then
strip ${build_dir}/bin/taos
cp ${build_dir}/bin/taos ${install_dir}/bin/prodbc
cp ${script_dir}/remove_pro.sh ${install_dir}/bin
cp ${script_dir}/remove_client_pro.sh ${install_dir}/bin
else
cp ${build_dir}/bin/taos ${install_dir}/bin/prodbc
cp ${script_dir}/remove_pro.sh ${install_dir}/bin
cp ${script_dir}/remove_client_pro.sh ${install_dir}/bin
cp ${build_dir}/bin/taosdemo ${install_dir}/bin/prodemo
cp ${build_dir}/bin/taosdump ${install_dir}/bin/prodump
cp ${script_dir}/set_core.sh ${install_dir}/bin
......
......@@ -69,10 +69,10 @@ if [ "$osType" != "Darwin" ]; then
if [ "$pagMode" == "lite" ]; then
strip ${build_dir}/bin/taos
cp ${build_dir}/bin/taos ${install_dir}/bin/tq
cp ${script_dir}/remove_tq.sh ${install_dir}/bin
cp ${script_dir}/remove_client_tq.sh ${install_dir}/bin
else
cp ${build_dir}/bin/taos ${install_dir}/bin/tq
cp ${script_dir}/remove_tq.sh ${install_dir}/bin
cp ${script_dir}/remove_client_tq.sh ${install_dir}/bin
cp ${build_dir}/bin/taosdemo ${install_dir}/bin/tqdemo
cp ${build_dir}/bin/taosdump ${install_dir}/bin/tqdump
cp ${script_dir}/set_core.sh ${install_dir}/bin
......
......@@ -256,7 +256,7 @@ void tscColumnListDestroy(SArray* pColList);
void tscColumnListCopy(SArray* dst, const SArray* src, uint64_t tableUid);
void tscColumnListCopyAll(SArray* dst, const SArray* src);
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId, bool convertNchar);
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId, bool convertNchar, bool convertJson);
void tscDequoteAndTrimToken(SStrToken* pToken);
void tscRmEscapeAndTrimToken(SStrToken* pToken);
......@@ -264,7 +264,7 @@ int32_t tscValidateName(SStrToken* pToken, bool escapeEnabled, bool *dbIncluded)
void tscIncStreamExecutionCount(void* pStream);
bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId, int32_t numOfParams);
bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId);
// get starter position of metric query condition (query on tags) in SSqlCmd.payload
SCond* tsGetSTableQueryCond(STagCond* pCond, uint64_t uid);
......@@ -394,6 +394,9 @@ void tscRemoveCachedTableMeta(STableMetaInfo* pTableMetaInfo, uint64_t id);
char* cloneCurrentDBName(SSqlObj* pSql);
int parseJsontoTagData(char* json, SKVRowBuilder* kvRowBuilder, char* errMsg, int16_t startColId);
int8_t jsonType2DbType(double data, int jsonType);
void getJsonKey(SStrToken *t0);
char* cloneCurrentDBName(SSqlObj* pSql);
......
......@@ -453,7 +453,7 @@ void tscRestoreFuncForSTableQuery(SQueryInfo *pQueryInfo);
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo, bool converted);
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock, bool convertNchar);
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock, bool convertNchar, bool convertJson);
void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlObj* pParent);
void destroyTableNameList(SInsertStatementParam* pInsertParam);
......
......@@ -547,6 +547,11 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn
jniFromNCharToByteArray(env, (char *)row[i], length[i]));
break;
}
case TSDB_DATA_TYPE_JSON: {
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetByteArrayFp, i,
jniFromNCharToByteArray(env, (char *)row[i], length[i]));
break;
}
case TSDB_DATA_TYPE_TIMESTAMP: {
int precision = taos_result_precision(result);
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetTimestampFp, i, (jlong) * ((int64_t *)row[i]), precision);
......
......@@ -85,16 +85,15 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 1);
dst = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 1) * totalNumOfRows + pField->bytes * i;
STR_WITH_MAXSIZE_TO_VARSTR(dst, type, pField->bytes);
int32_t bytes = pSchema[i].bytes;
if (pSchema[i].type == TSDB_DATA_TYPE_BINARY || pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
if (pSchema[i].type == TSDB_DATA_TYPE_BINARY){
bytes -= VARSTR_HEADER_SIZE;
if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
bytes = bytes / TSDB_NCHAR_SIZE;
}
}
else if(pSchema[i].type == TSDB_DATA_TYPE_NCHAR || pSchema[i].type == TSDB_DATA_TYPE_JSON) {
bytes -= VARSTR_HEADER_SIZE;
bytes = bytes / TSDB_NCHAR_SIZE;
}
pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 2);
......@@ -222,7 +221,7 @@ static int32_t tscGetNthFieldResult(TAOS_ROW row, TAOS_FIELD* fields, int *lengt
return -1;
}
uint8_t type = fields[idx].type;
int32_t length = lengths[idx];
int32_t length = lengths[idx];
switch (type) {
case TSDB_DATA_TYPE_BOOL:
......@@ -248,6 +247,7 @@ static int32_t tscGetNthFieldResult(TAOS_ROW row, TAOS_FIELD* fields, int *lengt
break;
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_JSON:
memcpy(result, val, length);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
......
......@@ -385,6 +385,19 @@ int32_t tsParseOneColumn(SSchema *pSchema, SStrToken *pToken, char *payload, cha
}
break;
case TSDB_DATA_TYPE_JSON:
if (pToken->n >= pSchema->bytes) { // reserve 1 byte for select
return tscInvalidOperationMsg(msg, "json tag length too long", pToken->z);
}
if (pToken->type == TK_NULL) {
*(int8_t *)payload = TSDB_DATA_TINYINT_NULL;
} else if (pToken->type != TK_STRING){
tscInvalidOperationMsg(msg, "invalid json data", pToken->z);
} else{
*((int8_t *)payload) = TSDB_DATA_JSON_PLACEHOLDER;
}
break;
case TSDB_DATA_TYPE_TIMESTAMP: {
if (pToken->type == TK_NULL) {
if (primaryKey) {
......@@ -1094,9 +1107,26 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
return code;
}
tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal);
}
tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal, false);
if(pSchema->type == TSDB_DATA_TYPE_JSON){
assert(spd.numOfBound == 1);
if(sToken.n > TSDB_MAX_JSON_TAGS_LEN/TSDB_NCHAR_SIZE){
tdDestroyKVRowBuilder(&kvRowBuilder);
tscDestroyBoundColumnInfo(&spd);
return tscSQLSyntaxErrMsg(pInsertParam->msg, "json tag too long", NULL);
}
char* json = strndup(sToken.z, sToken.n);
code = parseJsontoTagData(json, &kvRowBuilder, pInsertParam->msg, pTagSchema[spd.boundedColumns[0]].colId);
if (code != TSDB_CODE_SUCCESS) {
tdDestroyKVRowBuilder(&kvRowBuilder);
tscDestroyBoundColumnInfo(&spd);
tfree(json);
return code;
}
tfree(json);
}
}
tscDestroyBoundColumnInfo(&spd);
SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
......@@ -1110,7 +1140,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
if (pInsertParam->tagData.dataLen <= 0){
return tscSQLSyntaxErrMsg(pInsertParam->msg, "tag value expected", NULL);
}
char* pTag = realloc(pInsertParam->tagData.data, pInsertParam->tagData.dataLen);
if (pTag == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
......
......@@ -887,7 +887,7 @@ static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableNa
TAOS_RES* res = taos_query(taos, sql);
free(sql);
code = taos_errno(res);
info->affectedRows = taos_affected_rows(res);
info->affectedRows += taos_affected_rows(res);
taos_free_result(res);
return code;
}
......@@ -1303,14 +1303,6 @@ clean_up:
return code;
}
int tsc_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
SSmlLinesInfo* info = calloc(1, sizeof(SSmlLinesInfo));
info->id = genLinesSmlId();
int code = tscSmlInsert(taos, points, numPoint, info);
free(info);
return code;
}
//=========================================================================
/* Field Escape charaters
......
......@@ -1533,6 +1533,41 @@ int stmtGenInsertStatement(SSqlObj* pSql, STscStmt* pStmt, const char* name, TAO
return TSDB_CODE_SUCCESS;
}
int32_t stmtValidateValuesFields(SSqlCmd *pCmd, char * sql) {
int32_t loopCont = 1, index0 = 0, values = 0;
SStrToken sToken;
while (loopCont) {
sToken = tStrGetToken(sql, &index0, false);
if (sToken.n <= 0) {
return TSDB_CODE_SUCCESS;
}
switch (sToken.type) {
case TK_RP:
if (values) {
return TSDB_CODE_SUCCESS;
}
break;
case TK_VALUES:
values = 1;
break;
case TK_QUESTION:
case TK_LP:
break;
default:
if (values) {
tscError("only ? allowed in values");
return tscInvalidOperationMsg(pCmd->payload, "only ? allowed in values", NULL);
}
break;
}
}
return TSDB_CODE_SUCCESS;
}
////////////////////////////////////////////////////////////////////////////////
// interface functions
......@@ -1637,6 +1672,11 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
STMT_RET(ret);
}
ret = stmtValidateValuesFields(&pSql->cmd, pSql->sqlstr);
if (ret != TSDB_CODE_SUCCESS) {
STMT_RET(ret);
}
if (pStmt->multiTbInsert) {
STMT_RET(TSDB_CODE_SUCCESS);
}
......
此差异已折叠。
......@@ -506,7 +506,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
}
}
if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
if (pRes->code == TSDB_CODE_SUCCESS && pCmd->command < TSDB_SQL_MAX && tscProcessMsgRsp[pCmd->command]) {
rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
}
......@@ -832,7 +832,7 @@ static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo,
return TSDB_CODE_TSC_INVALID_TABLE_NAME;
}
if (validateColumn && !tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
if (validateColumn && !tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId)) {
tscError("0x%"PRIx64" table schema is not matched with parsed sql", id);
return TSDB_CODE_TSC_INVALID_OPERATION;
}
......@@ -906,6 +906,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SArray* queryOperator = createExecOperatorPlan(&query);
SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version));
int32_t numOfTags = query.numOfTags;
......@@ -1146,6 +1147,23 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
memcpy(pMsg, pSql->sqlstr, sqlLen);
pMsg += sqlLen;
/*
//MSG EXTEND DEMO
pQueryMsg->extend = 1;
STLV *tlv = (STLV *)pMsg;
tlv->type = htons(TLV_TYPE_DUMMY);
tlv->len = htonl(sizeof(int16_t));
*(int16_t *)tlv->value = htons(12345);
pMsg += sizeof(*tlv) + ntohl(tlv->len);
tlv = (STLV *)pMsg;
tlv->len = 0;
pMsg += sizeof(*tlv);
*/
int32_t msgLen = (int32_t)(pMsg - pCmd->payload);
tscDebug("0x%"PRIx64" msg built success, len:%d bytes", pSql->self, msgLen);
......@@ -1492,7 +1510,8 @@ int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
SCreateTableSql *pCreateTableInfo = pInfo->pCreateTableInfo;
if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
int32_t numOfTables = (int32_t)taosArrayGetSize(pInfo->pCreateTableInfo->childTableInfo);
size += numOfTables * (sizeof(SCreateTableMsg) + TSDB_MAX_TAGS_LEN);
size += numOfTables * (sizeof(SCreateTableMsg) +
((TSDB_MAX_TAGS_LEN > TSDB_MAX_JSON_TAGS_LEN)?TSDB_MAX_TAGS_LEN:TSDB_MAX_JSON_TAGS_LEN));
} else {
size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count);
}
......@@ -1614,9 +1633,6 @@ int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
}
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
char *pMsg;
int msgLen = 0;
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
......@@ -1645,14 +1661,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSchema++;
}
pMsg = (char *)pSchema;
pAlterTableMsg->tagValLen = htonl(pAlterInfo->tagData.dataLen);
if (pAlterInfo->tagData.dataLen > 0) {
memcpy(pMsg, pAlterInfo->tagData.data, pAlterInfo->tagData.dataLen);
}
pMsg += pAlterInfo->tagData.dataLen;
msgLen = (int32_t)(pMsg - (char*)pAlterTableMsg);
int msgLen = sizeof(SAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo);
pCmd->payloadLen = msgLen;
pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_TABLE;
......@@ -1854,7 +1863,9 @@ int tscProcessRetrieveGlobalMergeRsp(SSqlObj *pSql) {
uint64_t localQueryId = pSql->self;
qTableQuery(pQueryInfo->pQInfo, &localQueryId);
convertQueryResult(pRes, pQueryInfo, pSql->self, true);
bool convertJson = true;
if (pQueryInfo->isStddev == true) convertJson = false;
convertQueryResult(pRes, pQueryInfo, pSql->self, true, convertJson);
code = pRes->code;
if (pRes->code == TSDB_CODE_SUCCESS) {
......@@ -2813,7 +2824,11 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
tscAddQueryInfo(&pNew->cmd);
SQueryInfo *pNewQueryInfo = tscGetQueryInfoS(&pNew->cmd);
if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen)) {
int payLoadLen = TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen;
if (autocreate && pSql->cmd.insertParam.tagData.dataLen != 0) {
payLoadLen += pSql->cmd.insertParam.tagData.dataLen;
}
if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, payLoadLen)) {
tscError("0x%"PRIx64" malloc failed for payload to get table meta", pSql->self);
tscFreeSqlObj(pNew);
......
......@@ -445,7 +445,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
// revise the length for binary and nchar fields
if (f[j].type == TSDB_DATA_TYPE_BINARY) {
f[j].bytes -= VARSTR_HEADER_SIZE;
} else if (f[j].type == TSDB_DATA_TYPE_NCHAR) {
} else if (f[j].type == TSDB_DATA_TYPE_NCHAR || f[j].type == TSDB_DATA_TYPE_JSON) {
f[j].bytes = (f[j].bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE;
}
......
......@@ -227,6 +227,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) {
if (skipped) {
slot = 0;
stackidx = 0;
tVariantDestroy(&tag);
continue;
}
......@@ -334,6 +335,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) {
}
if (mergeDone) {
tVariantDestroy(&tag);
break;
}
......@@ -341,6 +343,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) {
stackidx = 0;
skipRemainValue(mainCtx->p->pTSBuf, &tag);
tVariantDestroy(&tag);
}
stackidx = 0;
......@@ -633,16 +636,21 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
// set the join condition tag column info, todo extract method
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
assert(pQueryInfo->tagCond.joinInfo.hasJoin);
pExpr->base.numOfParams = 0; // the value is 0 by default. just make sure.
// add json tag key, if there is no json tag key, just hold place.
tVariantCreateFromBinary(&(pExpr->base.param[pExpr->base.numOfParams]), pQueryInfo->tagCond.joinInfo.joinTables[0]->tagJsonKeyName,
strlen(pQueryInfo->tagCond.joinInfo.joinTables[0]->tagJsonKeyName), TSDB_DATA_TYPE_BINARY);
pExpr->base.numOfParams++;
int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
// set the tag column id for executor to extract correct tag value
tVariant* pVariant = &pExpr->base.param[0];
tVariant* pVariant = &pExpr->base.param[pExpr->base.numOfParams];
pVariant->i64 = colId;
pVariant->nType = TSDB_DATA_TYPE_BIGINT;
pVariant->nLen = sizeof(int64_t);
pExpr->base.numOfParams = 1;
pExpr->base.numOfParams++;
}
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
......@@ -729,8 +737,15 @@ int32_t tagValCompar(const void* p1, const void* p2) {
const STidTags* t1 = (const STidTags*) varDataVal(p1);
const STidTags* t2 = (const STidTags*) varDataVal(p2);
__compar_fn_t func = getComparFunc(t1->padding, 0);
if (t1->padding == TSDB_DATA_TYPE_JSON){
bool canReturn = true;
int32_t result = jsonCompareUnit(t1->tag, t2->tag, &canReturn);
if(canReturn) return result;
__compar_fn_t func = getComparFunc(t1->tag[0], 0);
return func(t1->tag + CHAR_BYTES, t2->tag + CHAR_BYTES);
}
__compar_fn_t func = getComparFunc(t1->padding, 0);
return func(t1->tag, t2->tag);
}
......@@ -821,16 +836,21 @@ static void issueTsCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj*
SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscAddFuncInSelectClause(pQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL, getNewResColId(pCmd));
SExprInfo *pExpr = tscAddFuncInSelectClause(pQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL, getNewResColId(pCmd));
// set the tags value for ts_comp function
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
SExprInfo *pExpr = tscExprGet(pQueryInfo, 0);
int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
pExpr->base.param[0].i64 = tagColId;
pExpr->base.param[0].nLen = sizeof(int64_t);
pExpr->base.param[0].nType = TSDB_DATA_TYPE_BIGINT;
pExpr->base.numOfParams = 1;
pExpr->base.numOfParams = 0; // the value is 0 by default. just make sure.
// add json tag key, if there is no json tag key, just hold place.
tVariantCreateFromBinary(&(pExpr->base.param[pExpr->base.numOfParams]), pSupporter->tagCond.joinInfo.joinTables[0]->tagJsonKeyName,
strlen(pSupporter->tagCond.joinInfo.joinTables[0]->tagJsonKeyName), TSDB_DATA_TYPE_BINARY);
pExpr->base.numOfParams++;
int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
pExpr->base.param[pExpr->base.numOfParams].i64 = tagColId;
pExpr->base.param[pExpr->base.numOfParams].nLen = sizeof(int64_t);
pExpr->base.param[pExpr->base.numOfParams].nType = TSDB_DATA_TYPE_BIGINT;
pExpr->base.numOfParams++;
}
// add the filter tag column
......@@ -2012,7 +2032,12 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
// set get tags query type
TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY);
tscAddFuncInSelectClause(pNewQueryInfo, 0, TSDB_FUNC_TID_TAG, &colIndex, &s1, TSDB_COL_TAG, getNewResColId(pCmd));
SExprInfo* pExpr = tscAddFuncInSelectClause(pNewQueryInfo, 0, TSDB_FUNC_TID_TAG, &colIndex, &s1, TSDB_COL_TAG, getNewResColId(pCmd));
if(strlen(pTagCond->joinInfo.joinTables[0]->tagJsonKeyName) > 0){
tVariantCreateFromBinary(&(pExpr->base.param[pExpr->base.numOfParams]), pTagCond->joinInfo.joinTables[0]->tagJsonKeyName,
strlen(pTagCond->joinInfo.joinTables[0]->tagJsonKeyName), TSDB_DATA_TYPE_BINARY);
pExpr->base.numOfParams++;
}
size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
tscDebug(
......@@ -2025,15 +2050,6 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
SColumnIndex colIndex = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscAddFuncInSelectClause(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &colIndex, &colSchema, TSDB_COL_NORMAL, getNewResColId(pCmd));
// set the tags value for ts_comp function
SExprInfo *pExpr = tscExprGet(pNewQueryInfo, 0);
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
pExpr->base.param->i64 = tagColId;
pExpr->base.numOfParams = 1;
}
// add the filter tag column
if (pSupporter->colList != NULL) {
size_t s = taosArrayGetSize(pSupporter->colList);
......@@ -2427,6 +2443,7 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);
tscInitQueryInfo(pNewQueryInfo);
pNewQueryInfo->isStddev = true; // for json tag
// add the group cond
pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr;
......@@ -2490,6 +2507,10 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
}
SExprInfo* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_TAG, &colIndex, schema, TSDB_COL_TAG, getNewResColId(pCmd));
if (schema->type == TSDB_DATA_TYPE_JSON){
p->base.numOfParams = pExpr->base.numOfParams;
tVariantAssign(&p->base.param[0], &pExpr->base.param[0]);
}
p->base.resColId = pExpr->base.resColId;
} else if (pExpr->base.functionId == TSDB_FUNC_PRJ) {
int32_t num = (int32_t) taosArrayGetSize(pNewQueryInfo->groupbyExpr.columnInfo);
......@@ -3681,7 +3702,9 @@ TAOS_ROW doSetResultRowData(SSqlObj *pSql) {
int32_t type = pInfo->field.type;
int32_t bytes = pInfo->field.bytes;
if (type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR) {
if (pQueryInfo->isStddev && type == TSDB_DATA_TYPE_JSON){ // for json tag compare in the second round of stddev
pRes->tsrow[j] = pRes->urow[i];
}else if (!IS_VAR_DATA_TYPE(type) && type != TSDB_DATA_TYPE_JSON) {
pRes->tsrow[j] = isNull(pRes->urow[i], type) ? NULL : pRes->urow[i];
} else {
pRes->tsrow[j] = isNull(pRes->urow[i], type) ? NULL : varDataVal(pRes->urow[i]);
......
......@@ -29,6 +29,7 @@
#include "tsclient.h"
#include "ttimer.h"
#include "ttokendef.h"
#include "cJSON.h"
#ifdef HTTP_EMBEDDED
#include "httpInt.h"
......@@ -109,6 +110,7 @@ int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *le
n = bufSize + escapeSize;
break;
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_JSON:
if (bufSize < 0) {
tscError("invalid buf size");
return TSDB_CODE_TSC_INVALID_VALUE;
......@@ -734,34 +736,33 @@ int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
return TSDB_CODE_SUCCESS;
}
static void setResRawPtrImpl(SSqlRes* pRes, SInternalField* pInfo, int32_t i, bool convertNchar) {
static void setResRawPtrImpl(SSqlRes* pRes, SInternalField* pInfo, int32_t i, bool convertNchar, bool convertJson) {
// generated the user-defined column result
if (pInfo->pExpr->pExpr == NULL && TSDB_COL_IS_UD_COL(pInfo->pExpr->base.colInfo.flag)) {
if (pInfo->pExpr->base.param[1].nType == TSDB_DATA_TYPE_NULL) {
if (pInfo->pExpr->base.param[0].nType == TSDB_DATA_TYPE_NULL) {
setNullN(pRes->urow[i], pInfo->field.type, pInfo->field.bytes, (int32_t) pRes->numOfRows);
} else {
if (pInfo->field.type == TSDB_DATA_TYPE_NCHAR || pInfo->field.type == TSDB_DATA_TYPE_BINARY) {
assert(pInfo->pExpr->base.param[1].nLen <= pInfo->field.bytes);
assert(pInfo->pExpr->base.param[0].nLen <= pInfo->field.bytes);
for (int32_t k = 0; k < pRes->numOfRows; ++k) {
char* p = ((char**)pRes->urow)[i] + k * pInfo->field.bytes;
memcpy(varDataVal(p), pInfo->pExpr->base.param[1].pz, pInfo->pExpr->base.param[1].nLen);
varDataSetLen(p, pInfo->pExpr->base.param[1].nLen);
memcpy(varDataVal(p), pInfo->pExpr->base.param[0].pz, pInfo->pExpr->base.param[0].nLen);
varDataSetLen(p, pInfo->pExpr->base.param[0].nLen);
}
} else {
for (int32_t k = 0; k < pRes->numOfRows; ++k) {
char* p = ((char**)pRes->urow)[i] + k * pInfo->field.bytes;
memcpy(p, &pInfo->pExpr->base.param[1].i64, pInfo->field.bytes);
memcpy(p, &pInfo->pExpr->base.param[0].i64, pInfo->field.bytes);
}
}
}
} else if (convertNchar && pInfo->field.type == TSDB_DATA_TYPE_NCHAR) {
} else if (convertNchar && (pInfo->field.type == TSDB_DATA_TYPE_NCHAR)) {
// convert unicode to native code in a temporary buffer extra one byte for terminated symbol
char* buffer = realloc(pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
if(buffer == NULL)
return ;
if (buffer == NULL) return;
pRes->buffer[i] = buffer;
// string terminated char for binary data
memset(pRes->buffer[i], 0, pInfo->field.bytes * pRes->numOfRows);
......@@ -785,8 +786,63 @@ static void setResRawPtrImpl(SSqlRes* pRes, SInternalField* pInfo, int32_t i, bo
p += pInfo->field.bytes;
}
memcpy(pRes->urow[i], pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
}else if (pInfo->field.type == TSDB_DATA_TYPE_JSON) {
if (convertJson){
// convert unicode to native code in a temporary buffer extra one byte for terminated symbol
char* buffer = realloc(pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
if (buffer == NULL) return;
pRes->buffer[i] = buffer;
// string terminated char for binary data
memset(pRes->buffer[i], 0, pInfo->field.bytes * pRes->numOfRows);
char* p = pRes->urow[i];
for (int32_t k = 0; k < pRes->numOfRows; ++k) {
char* dst = pRes->buffer[i] + k * pInfo->field.bytes;
char type = *p;
char* realData = p + CHAR_BYTES;
if (type == TSDB_DATA_TYPE_JSON && isNull(realData, TSDB_DATA_TYPE_JSON)) {
memcpy(dst, realData, varDataTLen(realData));
} else if (type == TSDB_DATA_TYPE_BINARY) {
assert(*(uint32_t*)varDataVal(realData) == TSDB_DATA_JSON_null); // json null value
assert(varDataLen(realData) == INT_BYTES);
sprintf(varDataVal(dst), "%s", "null");
varDataSetLen(dst, strlen(varDataVal(dst)));
}else if (type == TSDB_DATA_TYPE_JSON) {
int32_t length = taosUcs4ToMbs(varDataVal(realData), varDataLen(realData), varDataVal(dst));
varDataSetLen(dst, length);
if (length == 0) {
tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)p);
}
}else if (type == TSDB_DATA_TYPE_NCHAR) { // value -> "value"
*(char*)varDataVal(dst) = '\"';
int32_t length = taosUcs4ToMbs(varDataVal(realData), varDataLen(realData), POINTER_SHIFT(varDataVal(dst), CHAR_BYTES));
*(char*)(POINTER_SHIFT(varDataVal(dst), length + CHAR_BYTES)) = '\"';
varDataSetLen(dst, length + CHAR_BYTES*2);
if (length == 0) {
tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)p);
}
}else if (type == TSDB_DATA_TYPE_DOUBLE) {
double jsonVd = *(double*)(realData);
sprintf(varDataVal(dst), "%.9lf", jsonVd);
varDataSetLen(dst, strlen(varDataVal(dst)));
}else if (type == TSDB_DATA_TYPE_BIGINT) {
int64_t jsonVd = *(int64_t*)(realData);
sprintf(varDataVal(dst), "%" PRId64, jsonVd);
varDataSetLen(dst, strlen(varDataVal(dst)));
}else if (type == TSDB_DATA_TYPE_BOOL) {
sprintf(varDataVal(dst), "%s", (*((char *)realData) == 1) ? "true" : "false");
varDataSetLen(dst, strlen(varDataVal(dst)));
}else {
assert(0);
}
p += pInfo->field.bytes;
}
memcpy(pRes->urow[i], pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
}else{
// if convertJson is false, json data as raw data used for stddev for the second round
}
}
if (convertNchar) {
......@@ -794,6 +850,10 @@ static void setResRawPtrImpl(SSqlRes* pRes, SInternalField* pInfo, int32_t i, bo
}
}
void tscJson2String(char *src, char* dst){
}
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo, bool converted) {
assert(pRes->numOfCols > 0);
if (pRes->numOfRows == 0) {
......@@ -807,11 +867,11 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo, bool converted) {
pRes->length[i] = pInfo->field.bytes;
offset += pInfo->field.bytes;
setResRawPtrImpl(pRes, pInfo, i, converted ? false : true);
setResRawPtrImpl(pRes, pInfo, i, converted ? false : true, true);
}
}
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock, bool convertNchar) {
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock, bool convertNchar, bool convertJson) {
assert(pRes->numOfCols > 0);
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
......@@ -822,7 +882,7 @@ void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBloc
pRes->urow[i] = pColData->pData;
pRes->length[i] = pInfo->field.bytes;
setResRawPtrImpl(pRes, pInfo, i, convertNchar);
setResRawPtrImpl(pRes, pInfo, i, convertNchar, convertJson);
/*
// generated the user-defined column result
if (pInfo->pExpr->pExpr == NULL && TSDB_COL_IS_UD_COL(pInfo->pExpr->base.ColName.flag)) {
......@@ -878,17 +938,6 @@ void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBloc
}
}
static SColumnInfo* extractColumnInfoFromResult(SArray* pTableCols) {
int32_t numOfCols = (int32_t) taosArrayGetSize(pTableCols);
SColumnInfo* pColInfo = calloc(numOfCols, sizeof(SColumnInfo));
for(int32_t i = 0; i < numOfCols; ++i) {
SColumn* pCol = taosArrayGetP(pTableCols, i);
pColInfo[i] = pCol->info;//[index].type;
}
return pColInfo;
}
typedef struct SDummyInputInfo {
SSDataBlock *block;
STableQueryInfo *pTableQueryInfo;
......@@ -1278,14 +1327,14 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUp
return pOperator;
}
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId, bool convertNchar) {
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId, bool convertNchar, bool convertJson) {
// set the correct result
SSDataBlock* p = pQueryInfo->pQInfo->runtimeEnv.outputBuf;
pRes->numOfRows = (p != NULL)? p->info.rows: 0;
if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
tscCreateResPointerInfo(pRes, pQueryInfo);
tscSetResRawPtrRv(pRes, pQueryInfo, p, convertNchar);
tscSetResRawPtrRv(pRes, pQueryInfo, p, convertNchar, convertJson);
}
tscDebug("0x%"PRIx64" retrieve result in pRes, numOfRows:%d", objId, pRes->numOfRows);
......@@ -1319,8 +1368,6 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
// handle the following query process
if (px->pQInfo == NULL) {
SColumnInfo* pColumnInfo = extractColumnInfoFromResult(px->colList);
STableMeta* pTableMeta = tscGetMetaInfo(px, 0)->pTableMeta;
SSchema* pSchema = tscGetTableSchema(pTableMeta);
......@@ -1435,7 +1482,6 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
px->pQInfo->runtimeEnv.udfIsCopy = true;
px->pQInfo->runtimeEnv.pUdfInfo = pUdfInfo;
tfree(pColumnInfo);
tfree(schema);
// set the pRuntimeEnv for pSourceOperator
......@@ -1444,7 +1490,7 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
uint64_t qId = pSql->self;
qTableQuery(px->pQInfo, &qId);
convertQueryResult(pOutput, px, pSql->self, false);
convertQueryResult(pOutput, px, pSql->self, false, false);
}
static void tscDestroyResPointerInfo(SSqlRes* pRes) {
......@@ -3095,12 +3141,12 @@ void tscIncStreamExecutionCount(void* pStream) {
ps->num += 1;
}
bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId, int32_t numOfParams) {
bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId) {
if (pTableMetaInfo->pTableMeta == NULL) {
return false;
}
if (colId == TSDB_TBNAME_COLUMN_INDEX || (colId <= TSDB_UD_COLUMN_INDEX && numOfParams == 2)) {
if (colId == TSDB_TBNAME_COLUMN_INDEX || colId <= TSDB_UD_COLUMN_INDEX) {
return true;
}
......@@ -5396,3 +5442,152 @@ char* cloneCurrentDBName(SSqlObj* pSql) {
return p;
}
int parseJsontoTagData(char* json, SKVRowBuilder* kvRowBuilder, char* errMsg, int16_t startColId){
// set json NULL data
uint8_t nullTypeVal[CHAR_BYTES + VARSTR_HEADER_SIZE + INT_BYTES] = {0};
uint32_t jsonNULL = TSDB_DATA_JSON_NULL;
int jsonIndex = startColId + 1;
char nullTypeKey[VARSTR_HEADER_SIZE + INT_BYTES] = {0};
varDataSetLen(nullTypeKey, INT_BYTES);
nullTypeVal[0] = TSDB_DATA_TYPE_JSON;
varDataSetLen(nullTypeVal + CHAR_BYTES, INT_BYTES);
*(uint32_t*)(varDataVal(nullTypeKey)) = jsonNULL;
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_NCHAR, nullTypeKey, false); // add json null type
if (strtrim(json) == 0 || strcasecmp(json, "null") == 0){
*(uint32_t*)(varDataVal(nullTypeVal + CHAR_BYTES)) = jsonNULL;
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_NCHAR, nullTypeVal, true); // add json null value
return TSDB_CODE_SUCCESS;
}
int32_t jsonNotNull = TSDB_DATA_JSON_NOT_NULL;
*(uint32_t*)(varDataVal(nullTypeVal + CHAR_BYTES)) = jsonNotNull;
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_NCHAR, nullTypeVal, true); // add json type
// set json real data
cJSON *root = cJSON_Parse(json);
if (root == NULL){
tscError("json parse error");
return tscSQLSyntaxErrMsg(errMsg, "json parse error", NULL);
}
int size = cJSON_GetArraySize(root);
if(!cJSON_IsObject(root)){
tscError("json error invalide value");
return tscSQLSyntaxErrMsg(errMsg, "json error invalide value", NULL);
}
int retCode = 0;
SHashObj* keyHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false);
for(int i = 0; i < size; i++) {
cJSON* item = cJSON_GetArrayItem(root, i);
if (!item) {
tscError("json inner error:%d", i);
retCode = tscSQLSyntaxErrMsg(errMsg, "json inner error", NULL);
goto end;
}
char *jsonKey = item->string;
if(!isValidateTag(jsonKey)){
tscError("json key not validate");
retCode = tscSQLSyntaxErrMsg(errMsg, "json key not validate", NULL);
goto end;
}
if(strlen(jsonKey) > TSDB_MAX_JSON_KEY_LEN){
tscError("json key too long error");
retCode = tscSQLSyntaxErrMsg(errMsg, "json key too long, more than 256", NULL);
goto end;
}
if(strlen(jsonKey) == 0 || taosHashGet(keyHash, jsonKey, strlen(jsonKey)) != NULL){
continue;
}
// json key encode by binary
char tagKey[TSDB_MAX_JSON_KEY_LEN + VARSTR_HEADER_SIZE] = {0};
strncpy(varDataVal(tagKey), jsonKey, strlen(jsonKey));
int32_t outLen = (int32_t)strlen(jsonKey);
taosHashPut(keyHash, jsonKey, outLen, &outLen, CHAR_BYTES); // add key to hash to remove dumplicate, value is useless
varDataSetLen(tagKey, outLen);
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_NCHAR, tagKey, false); // add json key
if(item->type == cJSON_String){ // add json value format: type|data
char *jsonValue = item->valuestring;
outLen = 0;
char tagVal[TSDB_MAX_JSON_TAGS_LEN] = {0};
*tagVal = jsonType2DbType(0, item->type); // type
char* tagData = POINTER_SHIFT(tagVal,CHAR_BYTES);
if (!taosMbsToUcs4(jsonValue, strlen(jsonValue), varDataVal(tagData),
TSDB_MAX_JSON_TAGS_LEN - CHAR_BYTES - VARSTR_HEADER_SIZE, &outLen)) {
tscError("json string error:%s|%s", strerror(errno), jsonValue);
retCode = tscSQLSyntaxErrMsg(errMsg, "serizelize json error", NULL);
goto end;
}
varDataSetLen(tagData, outLen);
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_NCHAR, tagVal, true);
}else if(item->type == cJSON_Number){
char tagVal[LONG_BYTES + CHAR_BYTES] = {0};
*tagVal = jsonType2DbType(item->valuedouble, item->type); // type
char* tagData = POINTER_SHIFT(tagVal,CHAR_BYTES);
if(*tagVal == TSDB_DATA_TYPE_DOUBLE) *((double *)tagData) = item->valuedouble;
else if(*tagVal == TSDB_DATA_TYPE_BIGINT) *((int64_t *)tagData) = item->valueint;
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_BIGINT, tagVal, true);
}else if(item->type == cJSON_True || item->type == cJSON_False){
char tagVal[CHAR_BYTES + CHAR_BYTES] = {0};
*tagVal = jsonType2DbType((double)(item->valueint), item->type); // type
char* tagData = POINTER_SHIFT(tagVal,CHAR_BYTES);
*tagData = (char)(item->valueint);
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_BOOL, tagVal, true);
}else if(item->type == cJSON_NULL){
char tagVal[CHAR_BYTES + VARSTR_HEADER_SIZE + INT_BYTES] = {0};
*tagVal = jsonType2DbType(0, item->type); // type
int32_t* tagData = POINTER_SHIFT(tagVal,CHAR_BYTES);
varDataSetLen(tagData, INT_BYTES);
*(uint32_t*)(varDataVal(tagData)) = TSDB_DATA_JSON_null;
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_BINARY, tagVal, true);
}
else{
retCode = tscSQLSyntaxErrMsg(errMsg, "invalidate json value", NULL);
goto end;
}
}
if(taosHashGetSize(keyHash) == 0){ // set json NULL true
*(uint32_t*)(varDataVal(nullTypeVal + CHAR_BYTES)) = jsonNULL;
memcpy(POINTER_SHIFT(kvRowBuilder->buf, kvRowBuilder->pColIdx[2].offset), nullTypeVal, CHAR_BYTES + VARSTR_HEADER_SIZE + INT_BYTES);
}
end:
taosHashCleanup(keyHash);
cJSON_Delete(root);
return retCode;
}
int8_t jsonType2DbType(double data, int jsonType){
switch(jsonType){
case cJSON_Number:
if (data - (int64_t)data > 0) return TSDB_DATA_TYPE_DOUBLE; else return TSDB_DATA_TYPE_BIGINT;
case cJSON_String:
return TSDB_DATA_TYPE_NCHAR;
case cJSON_NULL:
return TSDB_DATA_TYPE_BINARY;
case cJSON_True:
case cJSON_False:
return TSDB_DATA_TYPE_BOOL;
}
return TSDB_DATA_TYPE_NULL;
}
// get key from json->'key'
void getJsonKey(SStrToken *t0){
while(true){
t0->n = tGetToken(t0->z, &t0->type);
if (t0->type == TK_STRING){
t0->z++;
t0->n -= 2;
break;
}else if (t0->type == TK_ILLEGAL){
assert(0);
}
t0->z += t0->n;
}
}
......@@ -547,7 +547,7 @@ void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder);
void tdResetKVRowBuilder(SKVRowBuilder *pBuilder);
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder);
static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, void *value) {
static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, void *value, bool isJumpJsonVType) {
if (pBuilder->nCols >= pBuilder->tCols) {
pBuilder->tCols *= 2;
SColIdx* pColIdx = (SColIdx *)realloc((void *)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols);
......@@ -560,9 +560,14 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId,
pBuilder->nCols++;
int tlen = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type];
char* jumpType = (char*)value;
if(isJumpJsonVType) jumpType += CHAR_BYTES;
int tlen = IS_VAR_DATA_TYPE(type) ? varDataTLen(jumpType) : TYPE_BYTES[type];
if(isJumpJsonVType) tlen += CHAR_BYTES; // add type size
if (tlen > pBuilder->alloc - pBuilder->size) {
while (tlen > pBuilder->alloc - pBuilder->size) {
assert(pBuilder->alloc > 0);
pBuilder->alloc *= 2;
}
void* buf = realloc(pBuilder->buf, pBuilder->alloc);
......
......@@ -25,7 +25,7 @@ extern "C" {
// variant, each number/string/field_id has a corresponding struct during parsing sql
typedef struct tVariant {
uint32_t nType;
int32_t nType; // change uint to int, because in tVariantCreate() pVar->nType = -1; // -1 means error type
int32_t nLen; // only used for string, for number, it is useless
union {
int64_t i64;
......
......@@ -247,6 +247,7 @@ void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *)) {
doExprTreeDestroy(&pNode, fp);
} else if (pNode->nodeType == TSQL_NODE_VALUE) {
tVariantDestroy(pNode->pVal);
tfree(pNode->pVal);
} else if (pNode->nodeType == TSQL_NODE_COL) {
tfree(pNode->pSchema);
} else if (pNode->nodeType == TSQL_NODE_FUNC) {
......@@ -255,7 +256,7 @@ void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *)) {
tfree(pNode->pType);
}
free(pNode);
tfree(pNode);
}
static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) {
......@@ -272,7 +273,7 @@ static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) {
}
} else if ((*pExpr)->nodeType == TSQL_NODE_VALUE) {
tVariantDestroy((*pExpr)->pVal);
free((*pExpr)->pVal);
tfree((*pExpr)->pVal);
} else if ((*pExpr)->nodeType == TSQL_NODE_COL) {
free((*pExpr)->pSchema);
} else if ((*pExpr)->nodeType == TSQL_NODE_FUNC) {
......@@ -284,7 +285,7 @@ static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) {
tfree((*pExpr)->pType);
}
free(*pExpr);
tfree(*pExpr);
*pExpr = NULL;
}
......@@ -502,7 +503,7 @@ static void exprTreeToBinaryImpl(SBufferWriter* bw, tExprNode* expr) {
tVariant* pVal = expr->pVal;
tbufWriteUint32(bw, pVal->nType);
if (pVal->nType == TSDB_DATA_TYPE_BINARY) {
if (pVal->nType == TSDB_DATA_TYPE_BINARY || pVal->nType == TSDB_DATA_TYPE_NCHAR) {
tbufWriteInt32(bw, pVal->nLen);
tbufWrite(bw, pVal->pz, pVal->nLen);
} else {
......@@ -571,7 +572,7 @@ static tExprNode* exprTreeFromBinaryImpl(SBufferReader* br) {
pExpr->pVal = pVal;
pVal->nType = tbufReadUint32(br);
if (pVal->nType == TSDB_DATA_TYPE_BINARY) {
if (pVal->nType == TSDB_DATA_TYPE_BINARY || pVal->nType == TSDB_DATA_TYPE_NCHAR) {
tbufReadToBuffer(br, &pVal->nLen, sizeof(pVal->nLen));
pVal->pz = calloc(1, pVal->nLen + 1);
tbufReadToBuffer(br, pVal->pz, pVal->nLen);
......
......@@ -4,6 +4,7 @@
#include "tname.h"
#include "ttoken.h"
#include "tvariant.h"
#include "tglobal.h"
#define VALIDNUMOFCOLS(x) ((x) >= TSDB_MIN_COLUMNS && (x) <= TSDB_MAX_COLUMNS)
#define VALIDNUMOFTAGS(x) ((x) >= 0 && (x) <= TSDB_MAX_TAGS)
......@@ -251,6 +252,9 @@ static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen
int32_t rowLen = 0;
for (int32_t i = 0; i < numOfCols; ++i) {
if (pSchema[i].type == TSDB_DATA_TYPE_JSON && numOfCols != 1){
return false;
}
// 1. valid types
if (!isValidDataType(pSchema[i].type)) {
return false;
......@@ -301,8 +305,12 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag
if (!doValidateSchema(pSchema, numOfCols, TSDB_MAX_BYTES_PER_ROW)) {
return false;
}
int32_t maxTagLen = TSDB_MAX_TAGS_LEN;
if (numOfTags == 1 && pSchema[numOfCols].type == TSDB_DATA_TYPE_JSON){
maxTagLen = TSDB_MAX_JSON_TAGS_LEN;
}
if (!doValidateSchema(&pSchema[numOfCols], numOfTags, TSDB_MAX_TAGS_LEN)) {
if (!doValidateSchema(&pSchema[numOfCols], numOfTags, maxTagLen)) {
return false;
}
......
......@@ -18,7 +18,7 @@
#include "ttokendef.h"
#include "tscompression.h"
const int32_t TYPE_BYTES[15] = {
const int32_t TYPE_BYTES[16] = {
-1, // TSDB_DATA_TYPE_NULL
sizeof(int8_t), // TSDB_DATA_TYPE_BOOL
sizeof(int8_t), // TSDB_DATA_TYPE_TINYINT
......@@ -34,6 +34,7 @@ const int32_t TYPE_BYTES[15] = {
sizeof(uint16_t), // TSDB_DATA_TYPE_USMALLINT
sizeof(uint32_t), // TSDB_DATA_TYPE_UINT
sizeof(uint64_t), // TSDB_DATA_TYPE_UBIGINT
sizeof(int8_t), // TSDB_DATA_TYPE_JSON
};
#define DO_STATICS(__sum, __min, __max, __minIndex, __maxIndex, _list, _index) \
......@@ -367,8 +368,8 @@ static void getStatics_nchr(const void *pData, int32_t numOfRow, int64_t *min, i
*maxIndex = 0;
}
tDataTypeDescriptor tDataTypes[15] = {
{TSDB_DATA_TYPE_NULL, 6, 1, "NOTYPE", 0, 0, NULL, NULL, NULL},
tDataTypeDescriptor tDataTypes[16] = {
{TSDB_DATA_TYPE_NULL, 6, 1, "NOTYPE", 0, 0, NULL, NULL, NULL},
{TSDB_DATA_TYPE_BOOL, 4, CHAR_BYTES, "BOOL", false, true, tsCompressBool, tsDecompressBool, getStatics_bool},
{TSDB_DATA_TYPE_TINYINT, 7, CHAR_BYTES, "TINYINT", INT8_MIN, INT8_MAX, tsCompressTinyint, tsDecompressTinyint, getStatics_i8},
{TSDB_DATA_TYPE_SMALLINT, 8, SHORT_BYTES, "SMALLINT", INT16_MIN, INT16_MAX, tsCompressSmallint, tsDecompressSmallint, getStatics_i16},
......@@ -376,13 +377,14 @@ tDataTypeDescriptor tDataTypes[15] = {
{TSDB_DATA_TYPE_BIGINT, 6, LONG_BYTES, "BIGINT", INT64_MIN, INT64_MAX, tsCompressBigint, tsDecompressBigint, getStatics_i64},
{TSDB_DATA_TYPE_FLOAT, 5, FLOAT_BYTES, "FLOAT", 0, 0, tsCompressFloat, tsDecompressFloat, getStatics_f},
{TSDB_DATA_TYPE_DOUBLE, 6, DOUBLE_BYTES, "DOUBLE", 0, 0, tsCompressDouble, tsDecompressDouble, getStatics_d},
{TSDB_DATA_TYPE_BINARY, 6, 0, "BINARY", 0, 0, tsCompressString, tsDecompressString, getStatics_bin},
{TSDB_DATA_TYPE_BINARY, 6, 0, "BINARY", 0, 0, tsCompressString, tsDecompressString, getStatics_bin},
{TSDB_DATA_TYPE_TIMESTAMP, 9, LONG_BYTES, "TIMESTAMP", INT64_MIN, INT64_MAX, tsCompressTimestamp, tsDecompressTimestamp, getStatics_i64},
{TSDB_DATA_TYPE_NCHAR, 5, 8, "NCHAR", 0, 0, tsCompressString, tsDecompressString, getStatics_nchr},
{TSDB_DATA_TYPE_NCHAR, 5, 8, "NCHAR", 0, 0, tsCompressString, tsDecompressString, getStatics_nchr},
{TSDB_DATA_TYPE_UTINYINT, 16, CHAR_BYTES, "TINYINT UNSIGNED", 0, UINT8_MAX, tsCompressTinyint, tsDecompressTinyint, getStatics_u8},
{TSDB_DATA_TYPE_USMALLINT, 17, SHORT_BYTES, "SMALLINT UNSIGNED", 0, UINT16_MAX, tsCompressSmallint, tsDecompressSmallint, getStatics_u16},
{TSDB_DATA_TYPE_UINT, 12, INT_BYTES, "INT UNSIGNED", 0, UINT32_MAX, tsCompressInt, tsDecompressInt, getStatics_u32},
{TSDB_DATA_TYPE_UBIGINT, 15, LONG_BYTES, "BIGINT UNSIGNED", 0, UINT64_MAX, tsCompressBigint, tsDecompressBigint, getStatics_u64},
{TSDB_DATA_TYPE_JSON,4, TSDB_MAX_JSON_TAGS_LEN, "JSON", 0, 0, tsCompressString, tsDecompressString, getStatics_nchr},
};
char tTokenTypeSwitcher[13] = {
......@@ -428,7 +430,7 @@ FORCE_INLINE void* getDataMax(int32_t type) {
bool isValidDataType(int32_t type) {
return type >= TSDB_DATA_TYPE_NULL && type <= TSDB_DATA_TYPE_UBIGINT;
return type >= TSDB_DATA_TYPE_NULL && type <= TSDB_DATA_TYPE_JSON;
}
void setVardataNull(void* val, int32_t type) {
......@@ -438,6 +440,9 @@ void setVardataNull(void* val, int32_t type) {
} else if (type == TSDB_DATA_TYPE_NCHAR) {
varDataSetLen(val, sizeof(int32_t));
*(uint32_t*) varDataVal(val) = TSDB_DATA_NCHAR_NULL;
} else if (type == TSDB_DATA_TYPE_JSON) {
varDataSetLen(val, sizeof(int32_t));
*(uint32_t*) varDataVal(val) = TSDB_DATA_JSON_NULL;
} else {
assert(0);
}
......@@ -505,6 +510,7 @@ void setNullN(void *val, int32_t type, int32_t bytes, int32_t numOfElems) {
break;
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_JSON:
for (int32_t i = 0; i < numOfElems; ++i) {
setVardataNull(POINTER_SHIFT(val, i * bytes), type);
}
......
......@@ -158,7 +158,7 @@ void tVariantCreateFromBinary(tVariant *pVar, const char *pz, size_t len, uint32
pVar->dKey = GET_FLOAT_VAL(pz);
break;
}
case TSDB_DATA_TYPE_NCHAR: { // here we get the nchar length from raw binary bits length
case TSDB_DATA_TYPE_NCHAR:{ // here we get the nchar length from raw binary bits length
size_t lenInwchar = len / TSDB_NCHAR_SIZE;
pVar->wpz = calloc(1, (lenInwchar + 1) * TSDB_NCHAR_SIZE);
......@@ -167,7 +167,13 @@ void tVariantCreateFromBinary(tVariant *pVar, const char *pz, size_t len, uint32
break;
}
case TSDB_DATA_TYPE_BINARY: { // todo refactor, extract a method
case TSDB_DATA_TYPE_JSON:{
pVar->pz = calloc(len + 2, sizeof(char));
memcpy(pVar->pz, pz, len);
pVar->nLen = (int32_t)len;
break;
}
case TSDB_DATA_TYPE_BINARY:{
pVar->pz = calloc(len + 1, sizeof(char));
memcpy(pVar->pz, pz, len);
pVar->nLen = (int32_t)len;
......@@ -185,7 +191,7 @@ void tVariantCreateFromBinary(tVariant *pVar, const char *pz, size_t len, uint32
void tVariantDestroy(tVariant *pVar) {
if (pVar == NULL) return;
if (pVar->nType == TSDB_DATA_TYPE_BINARY || pVar->nType == TSDB_DATA_TYPE_NCHAR) {
if (pVar->nType == TSDB_DATA_TYPE_BINARY || pVar->nType == TSDB_DATA_TYPE_NCHAR || pVar->nType == TSDB_DATA_TYPE_JSON) {
tfree(pVar->pz);
pVar->nLen = 0;
}
......@@ -210,11 +216,41 @@ bool tVariantIsValid(tVariant *pVar) {
return isValidDataType(pVar->nType);
}
bool tVariantTypeMatch(tVariant *pVar, int8_t dbType){
switch (dbType) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: {
if(pVar->nType != TSDB_DATA_TYPE_BINARY && pVar->nType != TSDB_DATA_TYPE_NCHAR){
return false;
}
break;
}
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE:{
if(pVar->nType == TSDB_DATA_TYPE_BINARY || pVar->nType == TSDB_DATA_TYPE_NCHAR){
return false;
}
break;
}
}
return true;
}
void tVariantAssign(tVariant *pDst, const tVariant *pSrc) {
if (pSrc == NULL || pDst == NULL) return;
pDst->nType = pSrc->nType;
if (pSrc->nType == TSDB_DATA_TYPE_BINARY || pSrc->nType == TSDB_DATA_TYPE_NCHAR) {
if (pSrc->nType == TSDB_DATA_TYPE_BINARY || pSrc->nType == TSDB_DATA_TYPE_NCHAR || pSrc->nType == TSDB_DATA_TYPE_JSON) {
int32_t len = pSrc->nLen + TSDB_NCHAR_SIZE;
char* p = realloc(pDst->pz, len);
assert(p);
......@@ -249,7 +285,7 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc) {
}
}
if (pDst->nType != TSDB_DATA_TYPE_POINTER_ARRAY && pDst->nType != TSDB_DATA_TYPE_VALUE_ARRAY) {
if (pDst->nType != TSDB_DATA_TYPE_POINTER_ARRAY && pDst->nType != TSDB_DATA_TYPE_VALUE_ARRAY && isValidDataType(pDst->nType)) { // if pDst->nType=-1, core dump. eg: where intcolumn=999999999999999999999999999
pDst->nLen = tDataTypes[pDst->nType].bytes;
}
}
......@@ -267,7 +303,7 @@ int32_t tVariantCompare(const tVariant* p1, const tVariant* p2) {
return 1;
}
if (p1->nType == TSDB_DATA_TYPE_BINARY || p1->nType == TSDB_DATA_TYPE_NCHAR) {
if (p1->nType == TSDB_DATA_TYPE_BINARY || p1->nType == TSDB_DATA_TYPE_NCHAR || p1->nType == TSDB_DATA_TYPE_JSON) {
if (p1->nLen == p2->nLen) {
return memcmp(p1->pz, p2->pz, p1->nLen);
} else {
......@@ -815,7 +851,7 @@ int32_t tVariantDumpEx(tVariant *pVariant, char *payload, int16_t type, bool inc
break;
}
case TSDB_DATA_TYPE_BINARY: {
case TSDB_DATA_TYPE_BINARY:{
if (!includeLengthPrefix) {
if (pVariant->nType == TSDB_DATA_TYPE_NULL) {
*(uint8_t*) payload = TSDB_DATA_BINARY_NULL;
......@@ -852,7 +888,7 @@ int32_t tVariantDumpEx(tVariant *pVariant, char *payload, int16_t type, bool inc
}
break;
}
case TSDB_DATA_TYPE_NCHAR: {
case TSDB_DATA_TYPE_NCHAR:{
int32_t newlen = 0;
if (!includeLengthPrefix) {
if (pVariant->nType == TSDB_DATA_TYPE_NULL) {
......@@ -888,6 +924,16 @@ int32_t tVariantDumpEx(tVariant *pVariant, char *payload, int16_t type, bool inc
break;
}
case TSDB_DATA_TYPE_JSON:{
if (pVariant->nType == TSDB_DATA_TYPE_BINARY){
*((int8_t *)payload) = TSDB_DATA_JSON_PLACEHOLDER;
} else if (pVariant->nType == TSDB_DATA_TYPE_JSON){ // select * from stable, set tag type to json,from setTagValue/tag_project_function
memcpy(payload, pVariant->pz, pVariant->nLen);
}else {
return -1;
}
break;
}
}
return 0;
......
......@@ -11,6 +11,11 @@ import java.util.Map;
public abstract class AbstractResultSet extends WrapperImpl implements ResultSet {
private int fetchSize;
protected boolean wasNull;
protected int timestampPrecision;
public void setTimestampPrecision(int timestampPrecision) {
this.timestampPrecision = timestampPrecision;
}
protected void checkAvailability(int columnIndex, int bounds) throws SQLException {
if (isClosed())
......
......@@ -74,9 +74,8 @@ public class TSDBResultSet extends AbstractResultSet implements ResultSet {
public boolean next() throws SQLException {
if (this.getBatchFetch()) {
if (this.blockData.forward()) {
if (this.blockData.forward())
return true;
}
int code = this.jniConnector.fetchBlock(this.resultSetPointer, this.blockData);
this.blockData.reset();
......@@ -214,7 +213,18 @@ public class TSDBResultSet extends AbstractResultSet implements ResultSet {
if (!lastWasNull) {
Object value = this.rowData.getObject(columnIndex);
if (value instanceof Timestamp) {
res = ((Timestamp) value).getTime();
Timestamp ts = (Timestamp) value;
long epochSec = ts.getTime() / 1000;
long nanoAdjustment = ts.getNanos();
switch (this.timestampPrecision) {
case 0:
default: // ms
return ts.getTime();
case 1: // us
return epochSec * 1000_000L + nanoAdjustment / 1000L;
case 2: // ns
return epochSec * 1000_000_000L + nanoAdjustment;
}
} else {
int nativeType = this.columnMetaDataList.get(columnIndex - 1).getColType();
res = this.rowData.getLong(columnIndex, nativeType);
......
......@@ -47,6 +47,8 @@ public class TSDBStatement extends AbstractStatement {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_WITH_EXECUTEQUERY);
}
TSDBResultSet res = new TSDBResultSet(this, this.connection.getConnector(), pSql);
int timestampPrecision = this.connection.getConnector().getResultTimePrecision(pSql);
res.setTimestampPrecision(timestampPrecision);
res.setBatchFetch(this.connection.getBatchFetch());
return res;
}
......
package com.taosdata.jdbc.enums;
public enum TimestampPrecision {
MS,
US,
NS,
UNKNOWN
public class TimestampPrecision {
public static final int MS = 0;
public static final int US = 1;
public static final int NS = 2;
}
......@@ -168,11 +168,22 @@ public class RestfulResultSet extends AbstractResultSet implements ResultSet {
case TIMESTAMP: {
Long value = row.getLong(colIndex);
//TODO: this implementation has bug if the timestamp bigger than 9999_9999_9999_9
if (value < 1_0000_0000_0000_0L)
if (value < 1_0000_0000_0000_0L) {
this.timestampPrecision = TimestampPrecision.MS;
return new Timestamp(value);
long epochSec = value / 1000_000L;
long nanoAdjustment = value % 1000_000L * 1000L;
return Timestamp.from(Instant.ofEpochSecond(epochSec, nanoAdjustment));
}
if (value >= 1_0000_0000_0000_0L && value < 1_000_000_000_000_000_0l) {
this.timestampPrecision = TimestampPrecision.US;
long epochSec = value / 1000_000L;
long nanoAdjustment = value % 1000_000L * 1000L;
return Timestamp.from(Instant.ofEpochSecond(epochSec, nanoAdjustment));
}
if (value >= 1_000_000_000_000_000_0l) {
this.timestampPrecision = TimestampPrecision.NS;
long epochSec = value / 1000_000_000L;
long nanoAdjustment = value % 1000_000_000L;
return Timestamp.from(Instant.ofEpochSecond(epochSec, nanoAdjustment));
}
}
case UTC: {
String value = row.getString(colIndex);
......@@ -182,12 +193,15 @@ public class RestfulResultSet extends AbstractResultSet implements ResultSet {
if (value.length() > 31) {
// ns timestamp: yyyy-MM-ddTHH:mm:ss.SSSSSSSSS+0x00
nanoAdjustment = fractionalSec;
this.timestampPrecision = TimestampPrecision.NS;
} else if (value.length() > 28) {
// ms timestamp: yyyy-MM-ddTHH:mm:ss.SSSSSS+0x00
nanoAdjustment = fractionalSec * 1000L;
this.timestampPrecision = TimestampPrecision.US;
} else {
// ms timestamp: yyyy-MM-ddTHH:mm:ss.SSS+0x00
nanoAdjustment = fractionalSec * 1000_000L;
this.timestampPrecision = TimestampPrecision.MS;
}
ZoneOffset zoneOffset = ZoneOffset.of(value.substring(value.length() - 5));
Instant instant = Instant.ofEpochSecond(epochSec, nanoAdjustment).atOffset(zoneOffset).toInstant();
......@@ -196,7 +210,9 @@ public class RestfulResultSet extends AbstractResultSet implements ResultSet {
case STRING:
default: {
String value = row.getString(colIndex);
TimestampPrecision precision = Utils.guessTimestampPrecision(value);
int precision = Utils.guessTimestampPrecision(value);
this.timestampPrecision = precision;
if (precision == TimestampPrecision.MS) {
// ms timestamp: yyyy-MM-dd HH:mm:ss.SSS
return row.getTimestamp(colIndex);
......@@ -338,8 +354,18 @@ public class RestfulResultSet extends AbstractResultSet implements ResultSet {
wasNull = value == null;
if (value == null)
return 0;
if (value instanceof Timestamp)
return ((Timestamp) value).getTime();
if (value instanceof Timestamp) {
Timestamp ts = (Timestamp) value;
switch (this.timestampPrecision) {
case TimestampPrecision.MS:
default:
return ts.getTime();
case TimestampPrecision.US:
return ts.getTime() * 1000 + ts.getNanos() / 1000 % 1000;
case TimestampPrecision.NS:
return ts.getTime() * 1000_000 + ts.getNanos() % 1000_000;
}
}
long valueAsLong = 0;
try {
valueAsLong = Long.parseLong(value.toString());
......
......@@ -194,14 +194,14 @@ public class Utils {
return timestamp.toLocalDateTime().format(milliSecFormatter);
}
public static TimestampPrecision guessTimestampPrecision(String value) {
public static int guessTimestampPrecision(String value) {
if (isMilliSecFormat(value))
return TimestampPrecision.MS;
if (isMicroSecFormat(value))
return TimestampPrecision.US;
if (isNanoSecFormat(value))
return TimestampPrecision.NS;
return TimestampPrecision.UNKNOWN;
return TimestampPrecision.MS;
}
private static boolean isMilliSecFormat(String timestampStr) {
......
package com.taosdata.jdbc.cases;
import org.junit.Assert;
import org.junit.Test;
import java.sql.*;
import java.text.SimpleDateFormat;
public class GetLongWithDifferentTimestampPrecision {
private final String host = "127.0.0.1";
@Test
public void testRestful() throws SQLException {
// given
String url = "jdbc:TAOS-RS://" + host + ":6041/";
Connection conn = DriverManager.getConnection(url, "root", "taosdata");
long ts = System.currentTimeMillis();
// when and then
assertResultSet(conn, "ms", ts, ts);
assertResultSet(conn, "us", ts, ts * 1000);
assertResultSet(conn, "ns", ts, ts * 1000_000);
}
@Test
public void testJni() throws SQLException {
// given
String url = "jdbc:TAOS://" + host + ":6030/";
Connection conn = DriverManager.getConnection(url, "root", "taosdata");
long ts = System.currentTimeMillis();
// when and then
assertResultSet(conn, "ms", ts, ts);
assertResultSet(conn, "us", ts, ts * 1000);
assertResultSet(conn, "ns", ts, ts * 1000_000);
}
private void assertResultSet(Connection conn, String precision, long timestamp, long expect) throws SQLException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
try (Statement stmt = conn.createStatement()) {
stmt.execute("drop database if exists test");
stmt.execute("create database if not exists test precision '" + precision + "'");
stmt.execute("create table test.weather(ts timestamp, f1 int)");
String dateTimeStr = sdf.format(new Date(timestamp));
stmt.execute("insert into test.weather values('" + dateTimeStr + "', 1)");
ResultSet rs = stmt.executeQuery("select * from test.weather");
rs.next();
long actual = rs.getLong("ts");
Assert.assertEquals(expect, actual);
stmt.execute("drop database if exists test");
}
}
}
......@@ -9,29 +9,29 @@ import java.util.Random;
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class RestfulJDBCTest {
// private static final String host = "127.0.0.1";
private static final String host = "master";
private static final String host = "127.0.0.1";
private static final Random random = new Random(System.currentTimeMillis());
private static Connection connection;
private static final String dbname = "restful_test";
@Test
public void testCase001() throws SQLException {
// given
String sql = "drop database if exists restful_test";
String sql = "drop database if exists " + dbname;
// when
boolean execute = execute(connection, sql);
// then
Assert.assertFalse(execute);
// given
sql = "create database if not exists restful_test";
sql = "create database if not exists " + dbname;
// when
execute = execute(connection, sql);
// then
Assert.assertFalse(execute);
// given
sql = "use restful_test";
sql = "use " + dbname;
// when
execute = execute(connection, sql);
// then
......@@ -41,7 +41,7 @@ public class RestfulJDBCTest {
@Test
public void testCase002() throws SQLException {
// given
String sql = "create table weather(ts timestamp, temperature float, humidity int) tags(location nchar(64), groupId int)";
String sql = "create table " + dbname + ".weather(ts timestamp, temperature float, humidity int) tags(location nchar(64), groupId int)";
// when
boolean execute = execute(connection, sql);
// then
......@@ -52,7 +52,7 @@ public class RestfulJDBCTest {
public void testCase004() throws SQLException {
for (int i = 1; i <= 100; i++) {
// given
String sql = "create table t" + i + " using weather tags('beijing', '" + i + "')";
String sql = "create table " + dbname + ".t" + i + " using " + dbname + ".weather tags('beijing', '" + i + "')";
// when
boolean execute = execute(connection, sql);
// then
......@@ -68,7 +68,7 @@ public class RestfulJDBCTest {
// given
long currentTimeMillis = System.currentTimeMillis();
String sql = "insert into t" + j + " values(" + currentTimeMillis + "," + (random.nextFloat() * 50) + "," + random.nextInt(100) + ")";
String sql = "insert into " + dbname + ".t" + j + " values(" + currentTimeMillis + "," + (random.nextFloat() * 50) + "," + random.nextInt(100) + ")";
// when
int affectRows = executeUpdate(connection, sql);
// then
......@@ -83,7 +83,7 @@ public class RestfulJDBCTest {
@Test
public void testCase006() throws SQLException {
// given
String sql = "select * from weather";
String sql = "select * from " + dbname + ".weather";
// when
ResultSet rs = executeQuery(connection, sql);
ResultSetMetaData meta = rs.getMetaData();
......@@ -102,7 +102,7 @@ public class RestfulJDBCTest {
@Test
public void testCase007() throws SQLException {
// given
String sql = "drop database restful_test";
String sql = "drop database " + dbname;
// when
boolean execute = execute(connection, sql);
......@@ -143,7 +143,7 @@ public class RestfulJDBCTest {
public static void afterClass() throws SQLException {
if (connection != null) {
Statement stmt = connection.createStatement();
stmt.execute("drop database if exists restful_test");
stmt.execute("drop database if exists " + dbname);
stmt.close();
connection.close();
}
......
......@@ -17,12 +17,25 @@ import java.text.SimpleDateFormat;
public class RestfulResultSetTest {
// private static final String host = "127.0.0.1";
private static final String host = "master";
private static final String host = "127.0.0.1";
private static Connection conn;
private static Statement stmt;
private static ResultSet rs;
@BeforeClass
public static void beforeClass() throws SQLException {
conn = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/?user=root&password=taosdata");
stmt = conn.createStatement();
stmt.execute("drop database if exists restful_test");
stmt.execute("create database if not exists restful_test");
stmt.execute("use restful_test");
stmt.execute("drop table if exists weather");
stmt.execute("create table if not exists weather(f1 timestamp, f2 int, f3 bigint, f4 float, f5 double, f6 binary(64), f7 smallint, f8 tinyint, f9 bool, f10 nchar(64))");
stmt.execute("insert into restful_test.weather values('2021-01-01 00:00:00.000', 1, 100, 3.1415, 3.1415926, 'abc', 10, 10, true, '涛思数据')");
rs = stmt.executeQuery("select * from restful_test.weather");
rs.next();
}
@Test
public void wasNull() throws SQLException {
Assert.assertFalse(rs.wasNull());
......@@ -658,20 +671,6 @@ public class RestfulResultSetTest {
Assert.assertTrue(rs.isWrapperFor(RestfulResultSet.class));
}
@BeforeClass
public static void beforeClass() throws SQLException {
conn = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/?user=root&password=taosdata");
stmt = conn.createStatement();
stmt.execute("drop database if exists restful_test");
stmt.execute("create database if not exists restful_test");
stmt.execute("use restful_test");
stmt.execute("drop table if exists weather");
stmt.execute("create table if not exists weather(f1 timestamp, f2 int, f3 bigint, f4 float, f5 double, f6 binary(64), f7 smallint, f8 tinyint, f9 bool, f10 nchar(64))");
stmt.execute("insert into restful_test.weather values('2021-01-01 00:00:00.000', 1, 100, 3.1415, 3.1415926, 'abc', 10, 10, true, '涛思数据')");
rs = stmt.executeQuery("select * from restful_test.weather");
rs.next();
}
@AfterClass
public static void afterClass() throws SQLException {
if (rs != null)
......
......@@ -7,7 +7,7 @@ import time
def subscribe_callback(p_sub, p_result, p_param, errno):
# type: (c_void_p, c_void_p, c_void_p, c_int) -> None
print("# fetch in callback")
result = TaosResult(p_result)
result = TaosResult(c_void_p(p_result))
result.check_error(errno)
for row in result.rows_iter():
ts, n = row()
......@@ -18,18 +18,21 @@ def test_subscribe_callback(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_subscribe_callback"
try:
print("drop if exists")
conn.execute("drop database if exists %s" % dbname)
print("create database")
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute("create table if not exists log(ts timestamp, n int)")
print("create table")
# conn.execute("use %s" % dbname)
conn.execute("create table if not exists %s.log(ts timestamp, n int)" % dbname)
print("# subscribe with callback")
sub = conn.subscribe(False, "test", "select * from log", 1000, subscribe_callback)
sub = conn.subscribe(False, "test", "select * from %s.log" % dbname, 1000, subscribe_callback)
for i in range(10):
conn.execute("insert into log values(now, %d)" % i)
conn.execute("insert into %s.log values(now, %d)" % (dbname, i))
time.sleep(0.7)
# sub.close()
sub.close()
conn.execute("drop database if exists %s" % dbname)
# conn.close()
......
......@@ -124,6 +124,21 @@ class TaosBind(ctypes.Structure):
self.buffer_length = length
self.length = pointer(c_size_t(self.buffer_length))
def json(self, value):
buffer = None
length = 0
if isinstance(value, str):
bytes = value.encode("utf-8")
buffer = create_string_buffer(bytes)
length = len(bytes)
else:
buffer = value
length = len(value)
self.buffer_type = FieldType.C_JSON
self.buffer = cast(buffer, c_void_p)
self.buffer_length = length
self.length = pointer(c_size_t(self.buffer_length))
def tinyint_unsigned(self, value):
self.buffer_type = FieldType.C_TINYINT_UNSIGNED
self.buffer = cast(pointer(c_uint8(value)), c_void_p)
......@@ -356,6 +371,11 @@ class TaosMultiBind(ctypes.Structure):
self.buffer_type = FieldType.C_NCHAR
self._str_to_buffer(values)
def json(self, values):
# type: (list[str]) -> None
self.buffer_type = FieldType.C_JSON
self._str_to_buffer(values)
def tinyint_unsigned(self, values):
self.buffer_type = FieldType.C_TINYINT_UNSIGNED
self.buffer_length = sizeof(c_uint8)
......
......@@ -110,7 +110,7 @@ _libtaos.taos_get_client_info.restype = c_char_p
def taos_get_client_info():
# type: () -> str
"""Get client version info."""
return _libtaos.taos_get_client_info().decode()
return _libtaos.taos_get_client_info().decode("utf-8")
_libtaos.taos_get_server_info.restype = c_char_p
......@@ -120,7 +120,7 @@ _libtaos.taos_get_server_info.argtypes = (c_void_p,)
def taos_get_server_info(connection):
# type: (c_void_p) -> str
"""Get server version as string."""
return _libtaos.taos_get_server_info(connection).decode()
return _libtaos.taos_get_server_info(connection).decode("utf-8")
_libtaos.taos_close.restype = None
......@@ -308,16 +308,14 @@ def taos_subscribe(connection, restart, topic, sql, interval, callback=None, par
"""
if callback != None:
callback = subscribe_callback_type(callback)
if param != None:
param = c_void_p(param)
return c_void_p(
_libtaos.taos_subscribe(
connection,
1 if restart else 0,
c_char_p(topic.encode("utf-8")),
c_char_p(sql.encode("utf-8")),
callback or None,
param,
callback,
c_void_p(param),
interval,
)
)
......
......@@ -25,6 +25,7 @@ class FieldType(object):
C_SMALLINT_UNSIGNED = 12
C_INT_UNSIGNED = 13
C_BIGINT_UNSIGNED = 14
C_JSON = 15
# NULL value definition
# NOTE: These values should change according to C definition in tsdb.h
C_BOOL_NULL = 0x02
......
......@@ -188,6 +188,9 @@ class TaosCursor(object):
if dataType.upper() == "NCHAR":
if self._description[col][1] == FieldType.C_NCHAR:
return True
if dataType.upper() == "JSON":
if self._description[col][1] == FieldType.C_JSON:
return True
return False
......
......@@ -144,7 +144,7 @@ def _crow_nchar_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_
try:
if num_of_rows >= 0:
tmpstr = ctypes.c_char_p(data)
res.append(tmpstr.value.decode())
res.append(tmpstr.value.decode("utf-8"))
else:
res.append(
(
......@@ -172,7 +172,7 @@ def _crow_binary_to_python_block(data, num_of_rows, nbytes=None, precision=Field
if rbyte == 1 and buffer[0] == b'\xff':
res.append(None)
else:
res.append(cast(buffer, c_char_p).value.decode())
res.append(cast(buffer, c_char_p).value.decode("utf-8"))
return res
......@@ -188,7 +188,7 @@ def _crow_nchar_to_python_block(data, num_of_rows, nbytes=None, precision=FieldT
if rbyte == 4 and buffer[:4] == b'\xff'*4:
res.append(None)
else:
res.append(cast(buffer, c_char_p).value.decode())
res.append(cast(buffer, c_char_p).value.decode("utf-8"))
return res
......@@ -207,6 +207,7 @@ CONVERT_FUNC = {
FieldType.C_SMALLINT_UNSIGNED: _crow_smallint_unsigned_to_python,
FieldType.C_INT_UNSIGNED: _crow_int_unsigned_to_python,
FieldType.C_BIGINT_UNSIGNED: _crow_bigint_unsigned_to_python,
FieldType.C_JSON: _crow_nchar_to_python,
}
CONVERT_FUNC_BLOCK = {
......@@ -224,6 +225,7 @@ CONVERT_FUNC_BLOCK = {
FieldType.C_SMALLINT_UNSIGNED: _crow_smallint_unsigned_to_python,
FieldType.C_INT_UNSIGNED: _crow_int_unsigned_to_python,
FieldType.C_BIGINT_UNSIGNED: _crow_bigint_unsigned_to_python,
FieldType.C_JSON: _crow_nchar_to_python_block,
}
# Corresponding TAOS_FIELD structure in C
......
......@@ -3,6 +3,8 @@ from .cinterface import *
# from .connection import TaosConnection
from .error import *
from ctypes import c_void_p
class TaosResult(object):
"""TDengine result interface"""
......@@ -12,7 +14,11 @@ class TaosResult(object):
# to make the __del__ order right
self._conn = conn
self._close_after = close_after
self._result = result
if isinstance(result, c_void_p):
self._result = result
else:
self._result = c_void_p(result)
self._fields = None
self._field_count = None
self._precision = None
......
......@@ -20,7 +20,8 @@ def stream_callback(p_param, p_result, p_row):
result = TaosResult(p_result)
row = TaosRow(result, p_row)
try:
ts, count = row()
ts, count = row.as_tuple()
print(ts, count)
p = cast(p_param, POINTER(Counter))
p.contents.count += count
print("[%s] inserted %d in 5s, total count: %d" % (ts.strftime("%Y-%m-%d %H:%M:%S"), count, p.contents.count))
......
......@@ -63,7 +63,7 @@ def test_subscribe(conn):
def subscribe_callback(p_sub, p_result, p_param, errno):
# type: (c_void_p, c_void_p, c_void_p, c_int) -> None
print("callback")
result = TaosResult(p_result)
result = TaosResult(c_void_p(p_result))
result.check_error(errno)
for row in result.rows_iter():
ts, n = row()
......@@ -76,7 +76,7 @@ def test_subscribe_callback(conn):
try:
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute("use %s" % dbname)
conn.execute("create table if not exists log(ts timestamp, n int)")
print("# subscribe with callback")
......
......@@ -170,7 +170,6 @@ int32_t dnodeInitSystem() {
taosResolveCRC();
taosInitGlobalCfg();
taosReadGlobalLogCfg();
taosSetCoreDump();
dnodeInitTmr();
if (dnodeCreateDir(tsLogDir) < 0) {
......@@ -190,6 +189,7 @@ int32_t dnodeInitSystem() {
return -1;
}
taosSetCoreDump();
dInfo("start to initialize TDengine");
taosInitNotes();
......
......@@ -120,6 +120,14 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
if (pMsg->pCont == NULL) return;
if (pMsg->msgType >= TSDB_MSG_TYPE_MAX) {
dError("RPC %p, shell msg type:%d is not processed", pMsg->handle, pMsg->msgType);
rpcMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
rpcSendResponse(&rpcMsg);
rpcFreeCont(pMsg->pCont);
return;
}
SRunStatus dnodeStatus = dnodeGetRunStatus();
if (dnodeStatus == TSDB_RUN_STATUS_STOPPED) {
dError("RPC %p, shell msg:%s is ignored since dnode exiting", pMsg->handle, taosMsg[pMsg->msgType]);
......
......@@ -46,6 +46,7 @@ typedef void **TAOS_ROW;
#define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes
#define TSDB_DATA_TYPE_UINT 13 // 4 bytes
#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes
#define TSDB_DATA_TYPE_JSON 15 // json string
typedef enum {
TSDB_OPTION_LOCALE,
......
......@@ -39,7 +39,7 @@ extern "C" {
#define TSKEY_INITIAL_VAL INT64_MIN
// Bytes for each type.
extern const int32_t TYPE_BYTES[15];
extern const int32_t TYPE_BYTES[16];
// TODO: replace and remove code below
#define CHAR_BYTES sizeof(char)
......@@ -70,6 +70,11 @@ extern const int32_t TYPE_BYTES[15];
#define TSDB_DATA_DOUBLE_NULL 0x7FFFFF0000000000L // an NAN
#define TSDB_DATA_NCHAR_NULL 0xFFFFFFFF
#define TSDB_DATA_BINARY_NULL 0xFF
#define TSDB_DATA_JSON_PLACEHOLDER 0x7F
#define TSDB_DATA_JSON_NULL 0xFFFFFFFF
#define TSDB_DATA_JSON_null 0xFFFFFFFE
#define TSDB_DATA_JSON_NOT_NULL 0x01
#define TSDB_DATA_JSON_CAN_NOT_COMPARE 0x7FFFFFFF
#define TSDB_DATA_UTINYINT_NULL 0xFF
#define TSDB_DATA_USMALLINT_NULL 0xFFFF
......@@ -176,6 +181,9 @@ do { \
#define TSDB_RELATION_MATCH 14
#define TSDB_RELATION_NMATCH 15
#define TSDB_RELATION_CONTAINS 16
#define TSDB_RELATION_ARROW 17
#define TSDB_BINARY_OP_ADD 30
#define TSDB_BINARY_OP_SUBTRACT 31
#define TSDB_BINARY_OP_MULTIPLY 32
......@@ -222,8 +230,11 @@ do { \
*/
#define TSDB_MAX_BYTES_PER_ROW 49151
#define TSDB_MAX_TAGS_LEN 16384
#define TSDB_MAX_JSON_TAGS_LEN (4096*TSDB_NCHAR_SIZE + 2 + 1) // 2->var_header_len 1->type
#define TSDB_MAX_TAGS 128
#define TSDB_MAX_TAG_CONDITIONS 1024
#define TSDB_MAX_JSON_KEY_LEN 256
#define TSDB_MAX_JSON_KEY_MD5_LEN 16
#define TSDB_AUTH_LEN 16
#define TSDB_KEY_LEN 16
......
......@@ -250,48 +250,48 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_INVALID_TSDB_STATE TAOS_DEF_ERROR_CODE(0, 0x0514) //"Invalid tsdb state"
// tsdb
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) //"Invalid table ID"
#define TSDB_CODE_TDB_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0601) //"Invalid table type"
#define TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0602) //"Invalid table schema version"
#define TSDB_CODE_TDB_TABLE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0603) //"Table already exists"
#define TSDB_CODE_TDB_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0604) //"Invalid configuration"
#define TSDB_CODE_TDB_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0605) //"Tsdb init failed"
#define TSDB_CODE_TDB_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x0606) //"No diskspace for tsdb"
#define TSDB_CODE_TDB_NO_DISK_PERMISSIONS TAOS_DEF_ERROR_CODE(0, 0x0607) //"No permission for disk files"
#define TSDB_CODE_TDB_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0608) //"Data file(s) corrupted"
#define TSDB_CODE_TDB_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0609) //"Out of memory"
#define TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE TAOS_DEF_ERROR_CODE(0, 0x060A) //"Tag too old"
#define TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x060B) //"Timestamp data out of range"
#define TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x060C) //"Submit message is messed up"
#define TSDB_CODE_TDB_INVALID_ACTION TAOS_DEF_ERROR_CODE(0, 0x060D) //"Invalid operation"
#define TSDB_CODE_TDB_INVALID_CREATE_TB_MSG TAOS_DEF_ERROR_CODE(0, 0x060E) //"Invalid creation of table"
#define TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM TAOS_DEF_ERROR_CODE(0, 0x060F) //"No table data in memory skiplist"
#define TSDB_CODE_TDB_FILE_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x0610) //"File already exists"
#define TSDB_CODE_TDB_TABLE_RECONFIGURE TAOS_DEF_ERROR_CODE(0, 0x0611) //"Need to reconfigure table"
#define TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO TAOS_DEF_ERROR_CODE(0, 0x0612) //"Invalid information to create table"
#define TSDB_CODE_TDB_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0613) //"No available disk"
#define TSDB_CODE_TDB_MESSED_MSG TAOS_DEF_ERROR_CODE(0, 0x0614) //"TSDB messed message"
#define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615) //"TSDB invalid tag value"
#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0616) //"TSDB no cache last row data"
#define TSDB_CODE_TDB_INCOMPLETE_DFILESET TAOS_DEF_ERROR_CODE(0, 0x0617) //"TSDB incomplete DFileSet"
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) //"Invalid table ID")
#define TSDB_CODE_TDB_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0601) //"Invalid table type")
#define TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0602) //"Invalid table schema version")
#define TSDB_CODE_TDB_TABLE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0603) //"Table already exists")
#define TSDB_CODE_TDB_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0604) //"Invalid configuration")
#define TSDB_CODE_TDB_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0605) //"Tsdb init failed")
#define TSDB_CODE_TDB_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x0606) //"No diskspace for tsdb")
#define TSDB_CODE_TDB_NO_DISK_PERMISSIONS TAOS_DEF_ERROR_CODE(0, 0x0607) //"No permission for disk files")
#define TSDB_CODE_TDB_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0608) //"Data file(s) corrupted")
#define TSDB_CODE_TDB_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0609) //"Out of memory")
#define TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE TAOS_DEF_ERROR_CODE(0, 0x060A) //"Tag too old")
#define TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x060B) //"Timestamp data out of range")
#define TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x060C) //"Submit message is messed up")
#define TSDB_CODE_TDB_INVALID_ACTION TAOS_DEF_ERROR_CODE(0, 0x060D) //"Invalid operation")
#define TSDB_CODE_TDB_INVALID_CREATE_TB_MSG TAOS_DEF_ERROR_CODE(0, 0x060E) //"Invalid creation of table")
#define TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM TAOS_DEF_ERROR_CODE(0, 0x060F) //"No table data in memory skiplist")
#define TSDB_CODE_TDB_FILE_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x0610) //"File already exists")
#define TSDB_CODE_TDB_TABLE_RECONFIGURE TAOS_DEF_ERROR_CODE(0, 0x0611) //"Need to reconfigure table")
#define TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO TAOS_DEF_ERROR_CODE(0, 0x0612) //"Invalid information to create table")
#define TSDB_CODE_TDB_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0613) //"No available disk")
#define TSDB_CODE_TDB_MESSED_MSG TAOS_DEF_ERROR_CODE(0, 0x0614) //"TSDB messed message")
#define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615) //"TSDB invalid tag value")
#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0616) //"TSDB no cache last row data")
#define TSDB_CODE_TDB_INCOMPLETE_DFILESET TAOS_DEF_ERROR_CODE(0, 0x0617) //"TSDB incomplete DFileSet")
#define TSDB_CODE_TDB_NO_JSON_TAG_KEY TAOS_DEF_ERROR_CODE(0, 0x0618) //"TSDB no tag json key")
// query
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700) //"Invalid handle"
#define TSDB_CODE_QRY_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x0701) //"Invalid message" // failed to validate the sql expression msg by vnode
#define TSDB_CODE_QRY_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x0702) //"No diskspace for query"
#define TSDB_CODE_QRY_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0703) //"System out of memory"
#define TSDB_CODE_QRY_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0704) //"Unexpected generic error in query"
#define TSDB_CODE_QRY_DUP_JOIN_KEY TAOS_DEF_ERROR_CODE(0, 0x0705) //"Duplicated join key"
#define TSDB_CODE_QRY_EXCEED_TAGS_LIMIT TAOS_DEF_ERROR_CODE(0, 0x0706) //"Tag condition too many"
#define TSDB_CODE_QRY_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0707) //"Query not ready"
#define TSDB_CODE_QRY_HAS_RSP TAOS_DEF_ERROR_CODE(0, 0x0708) //"Query should response"
#define TSDB_CODE_QRY_IN_EXEC TAOS_DEF_ERROR_CODE(0, 0x0709) //"Multiple retrieval of this query"
#define TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW TAOS_DEF_ERROR_CODE(0, 0x070A) //"Too many time window in query"
#define TSDB_CODE_QRY_NOT_ENOUGH_BUFFER TAOS_DEF_ERROR_CODE(0, 0x070B) //"Query buffer limit has reached"
#define TSDB_CODE_QRY_INCONSISTAN TAOS_DEF_ERROR_CODE(0, 0x070C) //"File inconsistency in replica"
#define TSDB_CODE_QRY_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x070D) //"System error"
#define TSDB_CODE_QRY_INVALID_TIME_CONDITION TAOS_DEF_ERROR_CODE(0, 0x070E) //"invalid time condition"
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700) //"Invalid handle")
#define TSDB_CODE_QRY_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x0701) //"Invalid message") // failed to validate the sql expression msg by vnode
#define TSDB_CODE_QRY_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x0702) //"No diskspace for query")
#define TSDB_CODE_QRY_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0703) //"System out of memory")
#define TSDB_CODE_QRY_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0704) //"Unexpected generic error in query")
#define TSDB_CODE_QRY_DUP_JOIN_KEY TAOS_DEF_ERROR_CODE(0, 0x0705) //"Duplicated join key")
#define TSDB_CODE_QRY_EXCEED_TAGS_LIMIT TAOS_DEF_ERROR_CODE(0, 0x0706) //"Tag condition too many")
#define TSDB_CODE_QRY_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0707) //"Query not ready")
#define TSDB_CODE_QRY_HAS_RSP TAOS_DEF_ERROR_CODE(0, 0x0708) //"Query should response")
#define TSDB_CODE_QRY_IN_EXEC TAOS_DEF_ERROR_CODE(0, 0x0709) //"Multiple retrieval of this query")
#define TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW TAOS_DEF_ERROR_CODE(0, 0x070A) //"Too many time window in query")
#define TSDB_CODE_QRY_NOT_ENOUGH_BUFFER TAOS_DEF_ERROR_CODE(0, 0x070B) //"Query buffer limit has reached")
#define TSDB_CODE_QRY_INCONSISTAN TAOS_DEF_ERROR_CODE(0, 0x070C) //"File inconsistency in replica")
#define TSDB_CODE_QRY_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x070D) //"System error")
#define TSDB_CODE_QRY_INVALID_TIME_CONDITION TAOS_DEF_ERROR_CODE(0, 0x070E) //"invalid time condition")
// grant
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired"
......
......@@ -230,6 +230,7 @@ typedef struct SSubmitBlk {
// Submit message for this TSDB
typedef struct SSubmitMsg {
SMsgHead header;
int8_t extend;
int32_t length;
int32_t numOfBlocks;
char blocks[];
......@@ -243,6 +244,7 @@ typedef struct {
} SShellSubmitRspBlock;
typedef struct {
int8_t extend;
int32_t code; // 0-success, > 0 error code
int32_t numOfRows; // number of records the client is trying to write
int32_t affectedRows; // number of records actually written
......@@ -278,6 +280,7 @@ typedef struct {
} SMDCreateTableMsg;
typedef struct {
int8_t extend;
int32_t len; // one create table message
char tableName[TSDB_TABLE_FNAME_LEN];
int8_t igExists;
......@@ -290,11 +293,13 @@ typedef struct {
} SCreateTableMsg;
typedef struct {
int8_t extend;
int32_t numOfTables;
int32_t contLen;
} SCMCreateTableMsg;
typedef struct {
int8_t extend;
char name[TSDB_TABLE_FNAME_LEN];
// if user specify DROP STABLE, this flag will be set. And an error will be returned if it is not a super table
int8_t supertable;
......@@ -302,6 +307,7 @@ typedef struct {
} SCMDropTableMsg;
typedef struct {
int8_t extend;
char tableFname[TSDB_TABLE_FNAME_LEN];
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
int16_t type; /* operation type */
......@@ -314,6 +320,7 @@ typedef struct {
typedef struct {
SMsgHead head;
int8_t extend;
int64_t uid;
int32_t tid;
int16_t tversion;
......@@ -327,6 +334,7 @@ typedef struct {
} SUpdateTableTagValMsg;
typedef struct {
int8_t extend;
char clientVersion[TSDB_VERSION_LEN];
char msgVersion[TSDB_VERSION_LEN];
char db[TSDB_TABLE_FNAME_LEN];
......@@ -335,6 +343,7 @@ typedef struct {
} SConnectMsg;
typedef struct {
int8_t extend;
char acctId[TSDB_ACCT_ID_LEN];
char serverVersion[TSDB_VERSION_LEN];
char clusterId[TSDB_CLUSTER_ID_LEN];
......@@ -361,16 +370,19 @@ typedef struct {
} SAcctCfg;
typedef struct {
int8_t extend;
char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN];
SAcctCfg cfg;
} SCreateAcctMsg, SAlterAcctMsg;
typedef struct {
char user[TSDB_USER_LEN];
int8_t extend;
char user[TSDB_USER_LEN];
} SDropUserMsg, SDropAcctMsg;
typedef struct {
int8_t extend;
char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN];
int8_t privilege;
......@@ -400,7 +412,7 @@ typedef struct SColIndex {
int16_t colId; // column id
int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag
uint16_t flag; // denote if it is a tag or a normal column
char name[TSDB_COL_NAME_LEN + TSDB_TABLE_NAME_LEN + 1];
char name[TSDB_COL_NAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_MAX_JSON_KEY_LEN + 4 + 1]; // 4 meams ->'' for json tag
} SColIndex;
typedef struct SColumnFilterInfo {
......@@ -462,6 +474,7 @@ typedef struct {
typedef struct {
SMsgHead head;
int8_t extend;
char version[TSDB_VERSION_LEN];
bool stableQuery; // super table query or not
......@@ -514,6 +527,7 @@ typedef struct {
} SQueryTableMsg;
typedef struct {
int8_t extend;
int32_t code;
union{uint64_t qhandle; uint64_t qId;}; // query handle
} SQueryTableRsp;
......@@ -521,11 +535,13 @@ typedef struct {
// todo: the show handle should be replaced with id
typedef struct {
SMsgHead header;
int8_t extend;
union{uint64_t qhandle; uint64_t qId;}; // query handle
uint16_t free;
} SRetrieveTableMsg;
typedef struct SRetrieveTableRsp {
int8_t extend;
int32_t numOfRows;
int8_t completed; // all results are returned to client
int16_t precision;
......@@ -551,6 +567,7 @@ typedef struct {
} SVnodeLoad;
typedef struct {
int8_t extend;
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
int32_t cacheBlockSize; //MB
int32_t totalBlocks;
......@@ -577,6 +594,7 @@ typedef struct {
} SCreateDbMsg, SAlterDbMsg;
typedef struct {
int8_t extend;
char name[TSDB_FUNC_NAME_LEN];
char path[PATH_MAX];
int32_t funcType;
......@@ -588,11 +606,13 @@ typedef struct {
} SCreateFuncMsg;
typedef struct {
int8_t extend;
int32_t num;
char name[];
} SRetrieveFuncMsg;
typedef struct {
int8_t extend;
char name[TSDB_FUNC_NAME_LEN];
int32_t funcType;
int8_t resType;
......@@ -603,15 +623,18 @@ typedef struct {
} SFunctionInfoMsg;
typedef struct {
int8_t extend;
int32_t num;
char content[];
} SUdfFuncMsg;
typedef struct {
int8_t extend;
char name[TSDB_FUNC_NAME_LEN];
} SDropFuncMsg;
typedef struct {
int8_t extend;
char db[TSDB_TABLE_FNAME_LEN];
uint8_t ignoreNotExists;
} SDropDbMsg, SUseDbMsg, SSyncDbMsg;
......@@ -744,12 +767,14 @@ typedef struct {
} SCreateVnodeMsg, SAlterVnodeMsg;
typedef struct {
int8_t extend;
char tableFname[TSDB_TABLE_FNAME_LEN];
int16_t createFlag;
char tags[];
} STableInfoMsg;
typedef struct {
int8_t extend;
uint8_t metaClone; // create local clone of the cached table meta
int32_t numOfVgroups;
int32_t numOfTables;
......@@ -758,21 +783,25 @@ typedef struct {
} SMultiTableInfoMsg;
typedef struct SSTableVgroupMsg {
int8_t extend;
int32_t numOfTables;
} SSTableVgroupMsg, SSTableVgroupRspMsg;
typedef struct {
int8_t extend;
int32_t vgId;
int8_t numOfEps;
SEpAddrMsg epAddr[TSDB_MAX_REPLICA];
} SVgroupMsg, SVgroupInfo;
typedef struct {
int8_t extend;
int32_t numOfVgroups;
SVgroupMsg vgroups[];
} SVgroupsMsg, SVgroupsInfo;
typedef struct STableMetaMsg {
int8_t extend;
int32_t contLen;
char tableFname[TSDB_TABLE_FNAME_LEN]; // table id
uint8_t numOfTags;
......@@ -792,6 +821,7 @@ typedef struct STableMetaMsg {
} STableMetaMsg;
typedef struct SMultiTableMeta {
int8_t extend;
int32_t numOfTables;
int32_t numOfVgroup;
int32_t numOfUdf;
......@@ -814,6 +844,7 @@ typedef struct {
* payloadLen is the length of payload
*/
typedef struct {
int8_t extend;
int8_t type;
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
uint16_t payloadLen;
......@@ -821,17 +852,20 @@ typedef struct {
} SShowMsg;
typedef struct {
int8_t extend;
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
int32_t numOfVgroup;
int32_t vgid[];
} SCompactMsg;
typedef struct SShowRsp {
int8_t extend;
uint64_t qhandle;
STableMetaMsg tableMeta;
} SShowRsp;
typedef struct {
int8_t extend;
char ep[TSDB_EP_LEN]; // end point, hostname:port
} SCreateDnodeMsg, SDropDnodeMsg;
......@@ -853,6 +887,7 @@ typedef struct {
} SConfigVnodeMsg;
typedef struct {
int8_t extend;
char ep[TSDB_EP_LEN]; // end point, hostname:port
char config[64];
} SCfgDnodeMsg;
......@@ -884,6 +919,7 @@ typedef struct {
} SStreamDesc;
typedef struct {
int8_t extend;
char clientVer[TSDB_VERSION_LEN];
uint32_t connId;
int32_t pid;
......@@ -894,6 +930,7 @@ typedef struct {
} SHeartBeatMsg;
typedef struct {
int8_t extend;
uint32_t queryId;
uint32_t streamId;
uint32_t totalDnodes;
......@@ -904,10 +941,12 @@ typedef struct {
} SHeartBeatRsp;
typedef struct {
int8_t extend;
char queryId[TSDB_KILL_MSG_LEN + 1];
} SKillQueryMsg, SKillStreamMsg, SKillConnMsg;
typedef struct {
int8_t extend;
int32_t vnode;
int32_t sid;
uint64_t uid;
......@@ -932,6 +971,16 @@ typedef struct {
char reserved2[64];
} SStartupStep;
typedef struct {
int16_t type;
int32_t len;
char value[];
} STLV;
enum {
TLV_TYPE_DUMMY = 1,
};
#pragma pack(pop)
#ifdef __cplusplus
......
......@@ -118,7 +118,7 @@ typedef struct {
void tsdbClearTableCfg(STableCfg *config);
void *tsdbGetTableTagVal(const void *pTable, int32_t colId, int16_t type, int16_t bytes);
void *tsdbGetTableTagVal(const void *pTable, int32_t colId, int16_t type);
char *tsdbGetTableName(void *pTable);
#define TSDB_TABLEID(_table) ((STableId*) (_table))
......@@ -418,10 +418,14 @@ int tsdbCompact(STsdbRepo *pRepo);
// no problem return true
bool tsdbNoProblem(STsdbRepo* pRepo);
// unit of walSize: MB
int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize);
// for json tag
void* getJsonTagValueElment(void* data, char* key, int32_t keyLen, char* out, int16_t bytes);
void getJsonTagValueAll(void* data, void* dst, int16_t bytes);
char* parseTagDatatoJson(void *p);
#ifdef __cplusplus
}
#endif
......
......@@ -29,196 +29,194 @@
#define TK_TIMESTAMP 10
#define TK_BINARY 11
#define TK_NCHAR 12
#define TK_OR 13
#define TK_AND 14
#define TK_NOT 15
#define TK_EQ 16
#define TK_NE 17
#define TK_ISNULL 18
#define TK_NOTNULL 19
#define TK_IS 20
#define TK_LIKE 21
#define TK_MATCH 22
#define TK_NMATCH 23
#define TK_GLOB 24
#define TK_BETWEEN 25
#define TK_IN 26
#define TK_GT 27
#define TK_GE 28
#define TK_LT 29
#define TK_LE 30
#define TK_BITAND 31
#define TK_BITOR 32
#define TK_LSHIFT 33
#define TK_RSHIFT 34
#define TK_PLUS 35
#define TK_MINUS 36
#define TK_DIVIDE 37
#define TK_TIMES 38
#define TK_STAR 39
#define TK_SLASH 40
#define TK_REM 41
#define TK_UMINUS 42
#define TK_UPLUS 43
#define TK_BITNOT 44
#define TK_SHOW 45
#define TK_DATABASES 46
#define TK_TOPICS 47
#define TK_FUNCTIONS 48
#define TK_MNODES 49
#define TK_DNODES 50
#define TK_ACCOUNTS 51
#define TK_USERS 52
#define TK_MODULES 53
#define TK_QUERIES 54
#define TK_CONNECTIONS 55
#define TK_STREAMS 56
#define TK_VARIABLES 57
#define TK_SCORES 58
#define TK_GRANTS 59
#define TK_VNODES 60
#define TK_DOT 61
#define TK_CREATE 62
#define TK_TABLE 63
#define TK_STABLE 64
#define TK_DATABASE 65
#define TK_TABLES 66
#define TK_STABLES 67
#define TK_VGROUPS 68
#define TK_DROP 69
#define TK_TOPIC 70
#define TK_FUNCTION 71
#define TK_DNODE 72
#define TK_USER 73
#define TK_ACCOUNT 74
#define TK_USE 75
#define TK_DESCRIBE 76
#define TK_DESC 77
#define TK_ALTER 78
#define TK_PASS 79
#define TK_PRIVILEGE 80
#define TK_LOCAL 81
#define TK_COMPACT 82
#define TK_LP 83
#define TK_RP 84
#define TK_IF 85
#define TK_EXISTS 86
#define TK_AS 87
#define TK_OUTPUTTYPE 88
#define TK_AGGREGATE 89
#define TK_BUFSIZE 90
#define TK_PPS 91
#define TK_TSERIES 92
#define TK_DBS 93
#define TK_STORAGE 94
#define TK_QTIME 95
#define TK_CONNS 96
#define TK_STATE 97
#define TK_COMMA 98
#define TK_KEEP 99
#define TK_CACHE 100
#define TK_REPLICA 101
#define TK_QUORUM 102
#define TK_DAYS 103
#define TK_MINROWS 104
#define TK_MAXROWS 105
#define TK_BLOCKS 106
#define TK_CTIME 107
#define TK_WAL 108
#define TK_FSYNC 109
#define TK_COMP 110
#define TK_PRECISION 111
#define TK_UPDATE 112
#define TK_CACHELAST 113
#define TK_PARTITIONS 114
#define TK_UNSIGNED 115
#define TK_TAGS 116
#define TK_USING 117
#define TK_NULL 118
#define TK_NOW 119
#define TK_SELECT 120
#define TK_UNION 121
#define TK_ALL 122
#define TK_DISTINCT 123
#define TK_FROM 124
#define TK_VARIABLE 125
#define TK_RANGE 126
#define TK_INTERVAL 127
#define TK_EVERY 128
#define TK_SESSION 129
#define TK_STATE_WINDOW 130
#define TK_FILL 131
#define TK_SLIDING 132
#define TK_ORDER 133
#define TK_BY 134
#define TK_ASC 135
#define TK_GROUP 136
#define TK_HAVING 137
#define TK_LIMIT 138
#define TK_OFFSET 139
#define TK_SLIMIT 140
#define TK_SOFFSET 141
#define TK_WHERE 142
#define TK_RESET 143
#define TK_QUERY 144
#define TK_SYNCDB 145
#define TK_ADD 146
#define TK_COLUMN 147
#define TK_MODIFY 148
#define TK_TAG 149
#define TK_CHANGE 150
#define TK_SET 151
#define TK_KILL 152
#define TK_CONNECTION 153
#define TK_STREAM 154
#define TK_COLON 155
#define TK_ABORT 156
#define TK_AFTER 157
#define TK_ATTACH 158
#define TK_BEFORE 159
#define TK_BEGIN 160
#define TK_CASCADE 161
#define TK_CLUSTER 162
#define TK_CONFLICT 163
#define TK_COPY 164
#define TK_DEFERRED 165
#define TK_DELIMITERS 166
#define TK_DETACH 167
#define TK_EACH 168
#define TK_END 169
#define TK_EXPLAIN 170
#define TK_FAIL 171
#define TK_FOR 172
#define TK_IGNORE 173
#define TK_IMMEDIATE 174
#define TK_INITIALLY 175
#define TK_INSTEAD 176
#define TK_KEY 177
#define TK_OF 178
#define TK_RAISE 179
#define TK_REPLACE 180
#define TK_RESTRICT 181
#define TK_ROW 182
#define TK_STATEMENT 183
#define TK_TRIGGER 184
#define TK_VIEW 185
#define TK_IPTOKEN 186
#define TK_SEMI 187
#define TK_NONE 188
#define TK_PREV 189
#define TK_LINEAR 190
#define TK_IMPORT 191
#define TK_TBNAME 192
#define TK_JOIN 193
#define TK_INSERT 194
#define TK_INTO 195
#define TK_VALUES 196
#define TK_FILE 197
#define TK_JSON 13
#define TK_OR 14
#define TK_AND 15
#define TK_NOT 16
#define TK_EQ 17
#define TK_NE 18
#define TK_ISNULL 19
#define TK_NOTNULL 20
#define TK_IS 21
#define TK_LIKE 22
#define TK_MATCH 23
#define TK_NMATCH 24
#define TK_CONTAINS 25
#define TK_GLOB 26
#define TK_BETWEEN 27
#define TK_IN 28
#define TK_GT 29
#define TK_GE 30
#define TK_LT 31
#define TK_LE 32
#define TK_BITAND 33
#define TK_BITOR 34
#define TK_LSHIFT 35
#define TK_RSHIFT 36
#define TK_PLUS 37
#define TK_MINUS 38
#define TK_DIVIDE 39
#define TK_TIMES 40
#define TK_STAR 41
#define TK_SLASH 42
#define TK_REM 43
#define TK_UMINUS 44
#define TK_UPLUS 45
#define TK_BITNOT 46
#define TK_ARROW 47
#define TK_SHOW 48
#define TK_DATABASES 49
#define TK_TOPICS 50
#define TK_FUNCTIONS 51
#define TK_MNODES 52
#define TK_DNODES 53
#define TK_ACCOUNTS 54
#define TK_USERS 55
#define TK_MODULES 56
#define TK_QUERIES 57
#define TK_CONNECTIONS 58
#define TK_STREAMS 59
#define TK_VARIABLES 60
#define TK_SCORES 61
#define TK_GRANTS 62
#define TK_VNODES 63
#define TK_DOT 64
#define TK_CREATE 65
#define TK_TABLE 66
#define TK_STABLE 67
#define TK_DATABASE 68
#define TK_TABLES 69
#define TK_STABLES 70
#define TK_VGROUPS 71
#define TK_DROP 72
#define TK_TOPIC 73
#define TK_FUNCTION 74
#define TK_DNODE 75
#define TK_USER 76
#define TK_ACCOUNT 77
#define TK_USE 78
#define TK_DESCRIBE 79
#define TK_DESC 80
#define TK_ALTER 81
#define TK_PASS 82
#define TK_PRIVILEGE 83
#define TK_LOCAL 84
#define TK_COMPACT 85
#define TK_LP 86
#define TK_RP 87
#define TK_IF 88
#define TK_EXISTS 89
#define TK_AS 90
#define TK_OUTPUTTYPE 91
#define TK_AGGREGATE 92
#define TK_BUFSIZE 93
#define TK_PPS 94
#define TK_TSERIES 95
#define TK_DBS 96
#define TK_STORAGE 97
#define TK_QTIME 98
#define TK_CONNS 99
#define TK_STATE 100
#define TK_COMMA 101
#define TK_KEEP 102
#define TK_CACHE 103
#define TK_REPLICA 104
#define TK_QUORUM 105
#define TK_DAYS 106
#define TK_MINROWS 107
#define TK_MAXROWS 108
#define TK_BLOCKS 109
#define TK_CTIME 110
#define TK_WAL 111
#define TK_FSYNC 112
#define TK_COMP 113
#define TK_PRECISION 114
#define TK_UPDATE 115
#define TK_CACHELAST 116
#define TK_PARTITIONS 117
#define TK_UNSIGNED 118
#define TK_TAGS 119
#define TK_USING 120
#define TK_NULL 121
#define TK_NOW 122
#define TK_SELECT 123
#define TK_UNION 124
#define TK_ALL 125
#define TK_DISTINCT 126
#define TK_FROM 127
#define TK_VARIABLE 128
#define TK_RANGE 129
#define TK_INTERVAL 130
#define TK_EVERY 131
#define TK_SESSION 132
#define TK_STATE_WINDOW 133
#define TK_FILL 134
#define TK_SLIDING 135
#define TK_ORDER 136
#define TK_BY 137
#define TK_ASC 138
#define TK_GROUP 139
#define TK_HAVING 140
#define TK_LIMIT 141
#define TK_OFFSET 142
#define TK_SLIMIT 143
#define TK_SOFFSET 144
#define TK_WHERE 145
#define TK_RESET 146
#define TK_QUERY 147
#define TK_SYNCDB 148
#define TK_ADD 149
#define TK_COLUMN 150
#define TK_MODIFY 151
#define TK_TAG 152
#define TK_CHANGE 153
#define TK_SET 154
#define TK_KILL 155
#define TK_CONNECTION 156
#define TK_STREAM 157
#define TK_COLON 158
#define TK_ABORT 159
#define TK_AFTER 160
#define TK_ATTACH 161
#define TK_BEFORE 162
#define TK_BEGIN 163
#define TK_CASCADE 164
#define TK_CLUSTER 165
#define TK_CONFLICT 166
#define TK_COPY 167
#define TK_DEFERRED 168
#define TK_DELIMITERS 169
#define TK_DETACH 170
#define TK_EACH 171
#define TK_END 172
#define TK_EXPLAIN 173
#define TK_FAIL 174
#define TK_FOR 175
#define TK_IGNORE 176
#define TK_IMMEDIATE 177
#define TK_INITIALLY 178
#define TK_INSTEAD 179
#define TK_KEY 180
#define TK_OF 181
#define TK_RAISE 182
#define TK_REPLACE 183
#define TK_RESTRICT 184
#define TK_ROW 185
#define TK_STATEMENT 186
#define TK_TRIGGER 187
#define TK_VIEW 188
#define TK_IPTOKEN 189
#define TK_SEMI 190
#define TK_NONE 191
#define TK_PREV 192
#define TK_LINEAR 193
#define TK_IMPORT 194
#define TK_TBNAME 195
#define TK_JOIN 196
#define TK_INSERT 197
#define TK_INTO 198
#define TK_VALUES 199
#define TK_FILE 200
......
......@@ -236,6 +236,8 @@ static FORCE_INLINE bool isNull(const void *val, int32_t type) {
return *(uint32_t *)val == TSDB_DATA_FLOAT_NULL;
case TSDB_DATA_TYPE_DOUBLE:
return *(uint64_t *)val == TSDB_DATA_DOUBLE_NULL;
case TSDB_DATA_TYPE_JSON:
return varDataLen(val) == sizeof(int32_t) && *(uint32_t *) varDataVal(val) == TSDB_DATA_JSON_NULL;
case TSDB_DATA_TYPE_NCHAR:
return varDataLen(val) == sizeof(int32_t) && *(uint32_t*) varDataVal(val) == TSDB_DATA_NCHAR_NULL;
case TSDB_DATA_TYPE_BINARY:
......@@ -266,10 +268,10 @@ typedef struct tDataTypeDescriptor {
int (*decompFunc)(const char *const input, int compressedSize, const int nelements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize);
void (*statisFunc)(const void *pData, int32_t numofrow, int64_t *min, int64_t *max, int64_t *sum,
int16_t *minindex, int16_t *maxindex, int16_t *numofnull);
int16_t *minindex, int16_t *maxindex, int16_t *numofnull);
} tDataTypeDescriptor;
extern tDataTypeDescriptor tDataTypes[15];
extern tDataTypeDescriptor tDataTypes[16];
bool isValidDataType(int32_t type);
......
......@@ -509,6 +509,7 @@ static void dumpFieldToFile(FILE* fp, const char* val, TAOS_FIELD* field, int32_
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_JSON:
memcpy(buf, val, length);
buf[length] = 0;
fprintf(fp, "\'%s\'", buf);
......@@ -692,6 +693,7 @@ static void printField(const char* val, TAOS_FIELD* field, int width, int32_t le
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_JSON:
shellPrintNChar(val, length, width);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
......@@ -805,7 +807,8 @@ static int calcColWidth(TAOS_FIELD* field, int precision) {
return MAX(field->bytes, width);
}
case TSDB_DATA_TYPE_NCHAR: {
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_JSON:{
int16_t bytes = field->bytes * TSDB_NCHAR_SIZE;
if (bytes > tsMaxBinaryDisplayWidth) {
return MAX(tsMaxBinaryDisplayWidth, width);
......
......@@ -21,7 +21,6 @@
#include "tlog.h"
#include "ttimer.h"
#include "tutil.h"
#include "tscUtil.h"
#include "tsclient.h"
#include "dnode.h"
#include "vnode.h"
......
Subproject commit f108f5240918d0eec90debd1ff469c98ff0f25ac
Subproject commit 88346a2e4e2e9282d2ec8b8c5264ca1ec23698a1
......@@ -200,7 +200,7 @@ typedef struct SQLFunctionCtx {
SResultRowCellInfo *resultInfo;
int16_t colId;
int16_t colId; // used for user-specified constant value
SExtTagsInfo tagInfo;
SPoint1 start;
SPoint1 end;
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册