diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 5ed0550cd8264eae79eed93cbd69d7ee6d56a6eb..49a4b61053d33757efc1d1979091ac2d6b8699f8 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -171,6 +171,8 @@ typedef struct STagCond { typedef struct STableDataBlocks { char meterId[TSDB_METER_ID_LEN]; + int8_t tsSource; + int64_t vgid; int64_t size; diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 7dec7280f83d2ccf238381cf232e17bede6c960d..5d3aa081d630c41b3c4bcb80af406e6af0cdbaa9 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -53,6 +53,11 @@ return TSDB_CODE_INVALID_SQL; \ } while (0) +static enum { + TSDB_USE_SERVER_TS = 0, + TSDB_USE_CLI_TS = 1, +}; + static void setErrMsg(char *msg, char *sql); static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize); @@ -166,6 +171,40 @@ int tsParseTime(char *value, int32_t valuelen, int64_t *time, char **next, char return TSDB_CODE_SUCCESS; } +/* + * The server time/client time should not be mixed up in one sql string + * Do not employ sort operation is not involved if server time is used. + */ +static int32_t tsCheckTimestamp(STableDataBlocks* pDataBlocks, const char* start) { + // once the data block is disordered, we do NOT keep previous timestamp any more + if (!pDataBlocks->ordered) { + return TSDB_CODE_SUCCESS; + } + + TSKEY k = *(TSKEY *) start; + + if (k == 0) { + if (pDataBlocks->tsSource == TSDB_USE_CLI_TS) { + return -1; + } else if (pDataBlocks->tsSource == -1) { + pDataBlocks->tsSource = TSDB_USE_SERVER_TS ; + } + } else { + if (pDataBlocks->tsSource == TSDB_USE_SERVER_TS) { + return -1; + } else if (pDataBlocks->tsSource == -1) { + pDataBlocks->tsSource = TSDB_USE_CLI_TS; + } + } + + if (k <= pDataBlocks->prevTS && (pDataBlocks->tsSource == TSDB_USE_CLI_TS)) { + pDataBlocks->ordered = false; + } + + pDataBlocks->prevTS = k; + return TSDB_CODE_SUCCESS; +} + int32_t tsParseOneColumnData(SSchema *pSchema, char *value, int valuelen, char *payload, char *msg, char **str, bool primaryKey, int16_t timePrec) { int64_t temp; @@ -391,20 +430,15 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[ valuelen++; } + bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX); int32_t ret = tsParseOneColumnData(&schema[colIndex], value, valuelen, start, error, str, colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX, timePrec); - if (ret != 0) { + if (ret != TSDB_CODE_SUCCESS) { return -1; // NOTE: here 0 mean error! } - // once the data block is disordered, we do NOT keep previous timestamp any more - if (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX && pDataBlocks->ordered) { - TSKEY k = *(TSKEY *)start; - if (k <= pDataBlocks->prevTS) { - pDataBlocks->ordered = false; - } - - pDataBlocks->prevTS = k; + if (isPrimaryKey && tsCheckTimestamp(pDataBlocks, start) != TSDB_CODE_SUCCESS) { + return -1; } } @@ -551,6 +585,11 @@ void sortRemoveDuplicates(STableDataBlocks *dataBuf) { // size is less than the total size, since duplicated rows may be removed yet. assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SShellSubmitBlock) == dataBuf->size); + // if use server time, this block must be ordered + if (dataBuf->tsSource == TSDB_USE_SERVER_TS) { + assert(dataBuf->ordered); + } + if (!dataBuf->ordered) { char *pBlockData = pBlocks->payLoad; qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index f86016e981072b596ab57ef92a993f9e3b39ed5a..2bd7474fbedae60c15f6cfbac52bb961f7906c98 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -338,7 +338,8 @@ STableDataBlocks* tscCreateDataBlock(int32_t size) { STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks)); dataBuf->nAllocSize = (uint32_t)size; dataBuf->pData = calloc(1, dataBuf->nAllocSize); - dataBuf->ordered = true; + + dataBuf->tsSource = -1; dataBuf->prevTS = INT64_MIN; return dataBuf; }