From 4e954c9e60da076ecefd7f25f1058bc12f27464b Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Sat, 27 Nov 2021 19:43:19 +0800 Subject: [PATCH] [TS-530] support CQ write to supertable --- src/client/src/tscSql.c | 83 ++++++++++++++++++++++++++++++++++++++ src/client/src/tscStream.c | 36 +++++++++++++---- src/inc/taos.h | 3 ++ src/query/src/queryMain.c | 1 + 4 files changed, 115 insertions(+), 8 deletions(-) diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index eb02d00988..56bd1d305d 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -894,6 +894,89 @@ int taos_print_row_ex(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_field return len; } +// print field value to str +int taos_print_field(char *str, void* value, TAOS_FIELD *field) { + // check valid + if (str == NULL || value == NULL || field == NULL) { + return 0; + } + + // get value + int len = 0; + switch (field->type) { + // + // fixed length + // + case TSDB_DATA_TYPE_TINYINT: + len = sprintf(str, "%d", *((int8_t *)value)); + break; + + case TSDB_DATA_TYPE_UTINYINT: + len = sprintf(str, "%u", *((uint8_t *)value)); + break; + + case TSDB_DATA_TYPE_SMALLINT: + len = sprintf(str, "%d", *((int16_t *)value)); + break; + + case TSDB_DATA_TYPE_USMALLINT: + len = sprintf(str, "%u", *((uint16_t *)value)); + break; + + case TSDB_DATA_TYPE_INT: + len = sprintf(str, "%d", *((int32_t *)value)); + break; + + case TSDB_DATA_TYPE_UINT: + len = sprintf(str, "%u", *((uint32_t *)value)); + break; + + case TSDB_DATA_TYPE_BIGINT: + len = sprintf(str, "%" PRId64, *((int64_t *)value)); + break; + + case TSDB_DATA_TYPE_UBIGINT: + len = sprintf(str, "%" PRIu64, *((uint64_t *)value)); + break; + + case TSDB_DATA_TYPE_FLOAT: { + float fv = 0; + fv = GET_FLOAT_VAL(value); + len = sprintf(str, "%f", fv); + } break; + + case TSDB_DATA_TYPE_DOUBLE: { + double dv = 0; + dv = GET_DOUBLE_VAL(value); + len = sprintf(str, "%lf", dv); + } break; + + case TSDB_DATA_TYPE_TIMESTAMP: + len = sprintf(str, "%" PRId64, *((int64_t *)value)); + break; + case TSDB_DATA_TYPE_BOOL: + len = sprintf(str, "%d", *((int8_t *)value)); + + // + // variant length + // + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: { + len = varDataLen((char*)value - VARSTR_HEADER_SIZE); + if (field->type == TSDB_DATA_TYPE_BINARY) { + assert(len <= field->bytes && len >= 0); + } else { + assert(len <= field->bytes * TSDB_NCHAR_SIZE && len >= 0); + } + memcpy(str, value, len); + } break; + + default: + break; + } + return len; +} + static void asyncCallback(void *param, TAOS_RES *tres, int code) { assert(param != NULL); SSqlObj *pSql = ((SSqlObj *)param); diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index f4e6be8c9e..78c98113f5 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -461,17 +461,30 @@ bool toAnotherTable(STscObj *pTscObj, char *superName, TAOS_FIELD *fields, int32 // add row to hash to group by tbname bool tbHashAdd(SHashObj *tbHash, TAOS_ROW row, TAOS_FIELD* fields, int32_t idx, int32_t numCols) { void *v = row[idx]; - VarDataLenT len = varDataLen((char*)v - VARSTR_HEADER_SIZE); - char *key = v; + TAOS_FIELD *field = &fields[idx]; + VarDataLenT len = 0; + char str[128]; + memset(str, 0, sizeof(str)); + char *key = str; + + // get key and len + if(field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_NCHAR) { + key = v; + len = varDataLen((char*)v - VARSTR_HEADER_SIZE); + } else { + len = taos_print_field(str, v, field); + } + if(len == 0) { + return false; + } - // get array point from hash + // append key with len SArray *arr = NULL; - // get arr from hash void* pdata = taosHashGet(tbHash, key, len); if(pdata) { arr = *(SArray **)pdata; } - // get arr from new + // if group is null create new if(arr == NULL) { arr = (SArray *)taosArrayInit(10, sizeof(TAOS_ROW)); if(arr == NULL) { @@ -481,7 +494,7 @@ bool tbHashAdd(SHashObj *tbHash, TAOS_ROW row, TAOS_FIELD* fields, int32_t idx, taosHashPut(tbHash, key, len, &arr, sizeof(SArray *)); } - // add to array + // append to group int32_t new_len = sizeof(void*) * numCols; TAOS_ROW new_row = (TAOS_ROW)tmalloc(new_len); memcpy(new_row, row, new_len); @@ -507,7 +520,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf if (numOfRows > 0) { // when reaching here the first execution of stream computing is successful. // init hash SHashObj* tbHash = NULL; - int32_t colIdx = 1; + int32_t colIdx = -1; TAOS_FIELD *fields = NULL; int32_t dstColsNum = pStream->dstCols; int32_t fieldsNum = 0; @@ -529,6 +542,11 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf break; } } + + // set default with last fields if + if(colIdx == -1) { + colIdx = fieldsNum - 1; + } } // save rows @@ -560,7 +578,9 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf pStream->stime = taosTimeAdd(pStream->stime, pStream->interval.sliding, pStream->interval.slidingUnit, pStream->precision); } // actually only one row is returned. this following is not necessary - taos_fetch_rows_a(res, tscProcessStreamRetrieveResult, pStream); + if(pQueryInfo->pQInfo->code == TSDB_CODE_SUCCESS) { + taos_fetch_rows_a(res, tscProcessStreamRetrieveResult, pStream); + } } else { // numOfRows == 0, all data has been retrieved pStream->useconds += pSql->res.useconds; if (pStream->numOfRes == 0) { diff --git a/src/inc/taos.h b/src/inc/taos.h index 0d7a426dae..460ae34156 100644 --- a/src/inc/taos.h +++ b/src/inc/taos.h @@ -137,8 +137,11 @@ DLL_EXPORT int taos_num_fields(TAOS_RES *res); DLL_EXPORT int taos_affected_rows(TAOS_RES *res); DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res); DLL_EXPORT int taos_select_db(TAOS *taos, const char *db); +// row to string DLL_EXPORT int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields); DLL_EXPORT int taos_print_row_ex(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields, char split, bool addQuota); +// one field to string +DLL_EXPORT int taos_print_field(char *str, void* value, TAOS_FIELD *field); DLL_EXPORT void taos_stop_query(TAOS_RES *res); DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col); DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows); diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index 28c5df3251..ee88b25646 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -220,6 +220,7 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) { if (isQueryKilled(pQInfo)) { qDebug("QInfo:0x%"PRIx64" it is already killed, abort", pQInfo->qId); + setQueryKilled(pQInfo); return doBuildResCheck(pQInfo); } -- GitLab