diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index f0c8ffebc2c30514b8ecb7727f963c3ce9007735..4400ca2c25b3c78971a66ebcf1aac425a1356c6a 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -891,11 +891,15 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) { return tscInvalidSQLErrMsg(pCmd->payload, "keyword TAGS expected", sToken.z); } + SKVRowBuilder kvRowBuilder = {0}; + if (tdInitKVRowBuilder(&kvRowBuilder) < 0) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + uint32_t ignoreTokenTypes = TK_LP; uint32_t numOfIgnoreToken = 1; for (int i = 0; i < spd.numOfAssignedCols; ++i) { - char * tagVal = pTag->data + spd.elems[i].offset; - int16_t colIndex = spd.elems[i].colIndex; + SSchema* pSchema = pTagSchema + spd.elems[i].colIndex; index = 0; sToken = tStrGetToken(sql, &index, true, numOfIgnoreToken, &ignoreTokenTypes); @@ -911,12 +915,26 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) { sToken.n -= 2; } - code = tsParseOneColumnData(&pTagSchema[colIndex], &sToken, tagVal, pCmd->payload, &sql, false, tinfo.precision); + char tagVal[TSDB_MAX_TAGS_LEN]; + code = tsParseOneColumnData(pSchema, &sToken, tagVal, pCmd->payload, &sql, false, tinfo.precision); if (code != TSDB_CODE_SUCCESS) { + tdDestroyKVRowBuilder(&kvRowBuilder); return code; } + + tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal); } + SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder); + tdDestroyKVRowBuilder(&kvRowBuilder); + if (row == NULL) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + tdSortKVRowByColIdx(row); + pTag->dataLen = kvRowLen(row); + kvRowCpy(pTag->data, row); + free(row); + index = 0; sToken = tStrGetToken(sql, &index, false, 0, NULL); sql += index; @@ -924,29 +942,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) { return tscInvalidSQLErrMsg(pCmd->payload, ") expected", sToken.z); } - // 2. set the null value for the columns that do not assign values - if (spd.numOfAssignedCols < spd.numOfCols) { - char *ptr = pTag->data; - - for (int32_t i = 0; i < spd.numOfCols; ++i) { - if (!spd.hasVal[i]) { // current tag column do not have any value to insert, set it to null - if (pTagSchema[i].type == TSDB_DATA_TYPE_BINARY || pTagSchema[i].type == TSDB_DATA_TYPE_NCHAR) { - setVardataNull(ptr, pTagSchema[i].type); - } else { - setNull(ptr, pTagSchema[i].type, pTagSchema[i].bytes); - } - } - - ptr += pTagSchema[i].bytes; - } - } - - // 3. calculate the actual data size of STagData - pCmd->payloadLen = sizeof(pTag->name) + sizeof(pTag->dataLen); - for (int32_t t = 0; t < numOfTags; ++t) { - pTag->dataLen += pTagSchema[t].bytes; - pCmd->payloadLen += pTagSchema[t].bytes; - } + pCmd->payloadLen = sizeof(pTag->name) + sizeof(pTag->dataLen) + pTag->dataLen; pTag->dataLen = htonl(pTag->dataLen); if (tscValidateName(&tableToken) != TSDB_CODE_SUCCESS) { diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index df18d7a56baa42d3286df29e156b1317c3e942cf..9f557f5529e03ee504aea2c30cc6bfac113a06aa 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -5623,24 +5623,41 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) { SSchema* pTagSchema = tscGetTableTagSchema(pStableMeterMetaInfo->pTableMeta); STagData* pTag = &pCreateTable->usingInfo.tagdata; - char* tagVal = pTag->data; + SKVRowBuilder kvRowBuilder = {0}; + if (tdInitKVRowBuilder(&kvRowBuilder) < 0) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + int32_t ret = TSDB_CODE_SUCCESS; - for (int32_t i = 0; i < pList->nExpr; ++i) { - if (pTagSchema[i].type == TSDB_DATA_TYPE_BINARY || pTagSchema[i].type == TSDB_DATA_TYPE_NCHAR) { + SSchema* pSchema = pTagSchema + i; + if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) { // validate the length of binary - if (pList->a[i].pVar.nLen + VARSTR_HEADER_SIZE > pTagSchema[i].bytes) { + if (pList->a[i].pVar.nLen + VARSTR_HEADER_SIZE > pSchema->bytes) { + tdDestroyKVRowBuilder(&kvRowBuilder); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); } } - ret = tVariantDump(&(pList->a[i].pVar), tagVal, pTagSchema[i].type, true); + char tagVal[TSDB_MAX_TAGS_LEN]; + ret = tVariantDump(&(pList->a[i].pVar), tagVal, pSchema->type, true); if (ret != TSDB_CODE_SUCCESS) { + tdDestroyKVRowBuilder(&kvRowBuilder); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4); } - tagVal += pTagSchema[i].bytes; + tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal); + } + + SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder); + tdDestroyKVRowBuilder(&kvRowBuilder); + if (row == NULL) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; } + tdSortKVRowByColIdx(row); + pTag->dataLen = kvRowLen(row); + kvRowCpy(pTag->data, row); + free(row); // table name if (tscValidateName(&pInfo->pCreateTableInfo->name) != TSDB_CODE_SUCCESS) { @@ -5653,7 +5670,6 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) { return ret; } - pTag->dataLen = tagVal - pTag->data; return TSDB_CODE_SUCCESS; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 259bcd4cbd2e63f52c33f072908ef647731d750e..26a81c597f077e5101b3920e792c650d00ca50f3 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -625,18 +625,31 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo return len; } +static int32_t getRowExpandSize(STableMeta* pTableMeta) { + int32_t result = TD_DATA_ROW_HEAD_SIZE; + int32_t columns = tscGetNumOfColumns(pTableMeta); + SSchema* pSchema = tscGetTableSchema(pTableMeta); + for(int32_t i = 0; i < columns; i++) { + if (IS_VAR_DATA_TYPE((pSchema + i)->type)) { + result += TYPE_BYTES[TSDB_DATA_TYPE_BINARY]; + } + } + return result; +} + int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { SSqlCmd* pCmd = &pSql->cmd; // the maximum expanded size in byte when a row-wise data is converted to SDataRow format - const int32_t MAX_EXPAND_SIZE = TD_DATA_ROW_HEAD_SIZE + TYPE_BYTES[TSDB_DATA_TYPE_BINARY]; + STableDataBlocks* pOneTableBlock = taosArrayGetP(pTableDataBlockList, 0); + int32_t expandSize = getRowExpandSize(pOneTableBlock->pTableMeta); void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false); SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES); size_t total = taosArrayGetSize(pTableDataBlockList); for (int32_t i = 0; i < total; ++i) { - STableDataBlocks* pOneTableBlock = taosArrayGetP(pTableDataBlockList, i); + pOneTableBlock = taosArrayGetP(pTableDataBlockList, i); STableDataBlocks* dataBuf = NULL; int32_t ret = @@ -650,7 +663,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { } SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData; - int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * MAX_EXPAND_SIZE; + int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize; if (dataBuf->nAllocSize < destSize) { while (dataBuf->nAllocSize < destSize) { @@ -678,8 +691,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { tscDebug("%p tableId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableId, pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(ekey)); - - int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + MAX_EXPAND_SIZE); + int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + expandSize); pBlocks->tid = htonl(pBlocks->tid); pBlocks->uid = htobe64(pBlocks->uid); diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 1cd72eafdedd512bb2d675613b178252efe5d5b2..baa212d8b76fd07625c19c49299efa77fb58768c 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -272,7 +272,7 @@ typedef struct { int16_t offset; } SColIdx; -#define TD_KV_ROW_HEAD_SIZE 2 * sizeof(int16_t) +#define TD_KV_ROW_HEAD_SIZE (2 * sizeof(int16_t)) #define kvRowLen(r) (*(int16_t *)(r)) #define kvRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(int16_t))) @@ -290,6 +290,7 @@ SKVRow tdKVRowDup(SKVRow row); int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value); int tdEncodeKVRow(void **buf, SKVRow row); void * tdDecodeKVRow(void *buf, SKVRow *row); +void tdSortKVRowByColIdx(SKVRow row); static FORCE_INLINE int comparTagId(const void *key1, const void *key2) { if (*(int16_t *)key1 > ((SColIdx *)key2)->colId) { diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index d7b2af870677ba1d96286b5dc9b29137369251ca..e5cbcfd143c642f7bc871e12d833cdcf544536be 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -515,6 +515,22 @@ SKVRow tdKVRowDup(SKVRow row) { return trow; } +static int compareColIdx(const void* a, const void* b) { + const SColIdx* x = (const SColIdx*)a; + const SColIdx* y = (const SColIdx*)b; + if (x->colId > y->colId) { + return 1; + } + if (x->colId < y->colId) { + return -1; + } + return 0; +} + +void tdSortKVRowByColIdx(SKVRow row) { + qsort(kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), compareColIdx); +} + int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) { SColIdx *pColIdx = NULL; SKVRow row = *orow; diff --git a/src/common/src/ttypes.c b/src/common/src/ttypes.c index 3c63e2deacc88aed967508a6affc44b1bc7d85ca..eff25e8c93047a174dddeb1f4b281942e94959cb 100644 --- a/src/common/src/ttypes.c +++ b/src/common/src/ttypes.c @@ -464,6 +464,29 @@ void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems) { } } +static uint8_t nullBool = TSDB_DATA_BOOL_NULL; +static uint8_t nullTinyInt = TSDB_DATA_TINYINT_NULL; +static uint16_t nullSmallInt = TSDB_DATA_SMALLINT_NULL; +static uint32_t nullInt = TSDB_DATA_INT_NULL; +static uint64_t nullBigInt = TSDB_DATA_BIGINT_NULL; +static uint32_t nullFloat = TSDB_DATA_FLOAT_NULL; +static uint64_t nullDouble = TSDB_DATA_DOUBLE_NULL; + +static union { + tstr str; + char pad[sizeof(tstr) + 4]; +} nullBinary = {.str = {.len = 1}}, nullNchar = {.str = {.len = 4}}; + +static void *nullValues[] = { + &nullBool, &nullTinyInt, &nullSmallInt, &nullInt, &nullBigInt, + &nullFloat, &nullDouble, &nullBinary, &nullBigInt, &nullNchar, +}; + +void *getNullValue(int32_t type) { + assert(type >= TSDB_DATA_TYPE_BOOL && type <= TSDB_DATA_TYPE_NCHAR); + return nullValues[type - 1]; +} + void assignVal(char *val, const char *src, int32_t len, int32_t type) { switch (type) { case TSDB_DATA_TYPE_INT: { diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 098d69fcb206acbd5ec88d66cc1c3b9c347d02ba..3e7e8525efc90bbe877e8636a34ded4851805ace 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -256,30 +256,13 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { SDataRow trow = (SDataRow)pBlk->data; tdInitDataRow(trow, pSchema); - union { - char buf[sizeof(int64_t)]; - tstr str; - } nullVal; - for (int32_t i = 0; i < pSchema->numOfCols; i++) { STColumn *c = pSchema->columns + i; - char* val = (char*)row[i]; - if (IS_VAR_DATA_TYPE(c->type)) { - if (val == NULL) { - val = nullVal.buf; - if (c->type == TSDB_DATA_TYPE_BINARY) { - setNull(nullVal.str.data, TSDB_DATA_TYPE_BINARY, 1); - nullVal.str.len = 1; - } else { - setNull(nullVal.str.data, TSDB_DATA_TYPE_NCHAR, 4); - nullVal.str.len = 4; - } - } else { - val -= sizeof(VarDataLenT); - } - } else if (val == NULL) { - val = nullVal.buf; - setNull(val, c->type, c->bytes); + void* val = row[i]; + if (val == NULL) { + val = getNullValue(c->type); + } else if (IS_VAR_DATA_TYPE(c->type)) { + val = ((char*)val) - sizeof(VarDataLenT); } tdAppendColVal(trow, val, c->type, c->bytes, c->offset); } diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index ecf78edfd5725b7ae701bbf100d6b422b65f04fc..76ca99c9ad26bb9228cd1d94cf463657bf1a5f8f 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -165,6 +165,7 @@ bool isNull(const char *val, int32_t type); void setVardataNull(char* val, int32_t type); void setNull(char *val, int32_t type, int32_t bytes); void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems); +void* getNullValue(int32_t type); void assignVal(char *val, const char *src, int32_t len, int32_t type); void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 13fa799b3fedaba096467757746f59b9a15a8ec6..cb25242d27d3f8e50af643df3c447c7aaae76484 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -26,6 +26,7 @@ extern "C" { #include "taosdef.h" #include "taoserror.h" #include "trpc.h" +#include "tdataformat.h" // message type @@ -674,7 +675,7 @@ typedef struct SMultiTableMeta { typedef struct { int32_t dataLen; char name[TSDB_TABLE_ID_LEN]; - char data[TSDB_MAX_TAGS_LEN]; + char data[TSDB_MAX_TAGS_LEN + TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * TSDB_MAX_TAGS]; } STagData; /* diff --git a/src/plugins/http/src/httpContext.c b/src/plugins/http/src/httpContext.c index b078be59300a2390246b905ca5f2156464e34f74..b09f34b562294e13f9ee7e9e728f1ad974f8c9b1 100644 --- a/src/plugins/http/src/httpContext.c +++ b/src/plugins/http/src/httpContext.c @@ -141,10 +141,15 @@ HttpContext *httpGetContext(void *ptr) { void httpReleaseContext(HttpContext *pContext) { int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1); assert(refCount >= 0); - httpDebug("context:%p, fd:%d, is releasd, refCount:%d", pContext, pContext->fd, refCount); + httpDebug("context:%p, is releasd, refCount:%d", pContext, refCount); HttpContext **ppContext = pContext->ppContext; - taosCacheRelease(tsHttpServer.contextCache, (void **)(&ppContext), false); + if (tsHttpServer.contextCache != NULL) { + taosCacheRelease(tsHttpServer.contextCache, (void **)(&ppContext), false); + } else { + httpDebug("context:%p, won't be destroyed for cache is already released", pContext); + // httpDestroyContext((void **)(&ppContext)); + } } bool httpInitContext(HttpContext *pContext) { diff --git a/src/plugins/http/src/httpHandle.c b/src/plugins/http/src/httpHandle.c index 758286344a357ea42e2025aabebe9c6550750cf1..056fe425d4a8db454c8141aa4cae775f6fe2bd6f 100644 --- a/src/plugins/http/src/httpHandle.c +++ b/src/plugins/http/src/httpHandle.c @@ -157,7 +157,7 @@ bool httpGetHttpMethod(HttpContext* pContext) { pParser->method.pos[pParser->method.len] = 0; pParser->pLast = pSeek + 1; - httpDebug("context:%p, fd:%d, ip:%s, httpMethod:%s", pContext, pContext->fd, pContext->ipstr, pParser->method.pos); + httpTrace("context:%p, fd:%d, ip:%s, httpMethod:%s", pContext, pContext->fd, pContext->ipstr, pParser->method.pos); return true; } @@ -186,23 +186,23 @@ bool httpParseHead(HttpContext* pContext) { HttpParser* pParser = &pContext->parser; if (strncasecmp(pParser->pLast, "Content-Length: ", 16) == 0) { pParser->data.len = (int32_t)atoi(pParser->pLast + 16); - httpDebug("context:%p, fd:%d, ip:%s, Content-Length:%d", pContext, pContext->fd, pContext->ipstr, + httpTrace("context:%p, fd:%d, ip:%s, Content-Length:%d", pContext, pContext->fd, pContext->ipstr, pParser->data.len); } else if (strncasecmp(pParser->pLast, "Accept-Encoding: ", 17) == 0) { if (tsHttpEnableCompress && strstr(pParser->pLast + 17, "gzip") != NULL) { pContext->acceptEncoding = HTTP_COMPRESS_GZIP; - httpDebug("context:%p, fd:%d, ip:%s, Accept-Encoding:gzip", pContext, pContext->fd, pContext->ipstr); + httpTrace("context:%p, fd:%d, ip:%s, Accept-Encoding:gzip", pContext, pContext->fd, pContext->ipstr); } else { pContext->acceptEncoding = HTTP_COMPRESS_IDENTITY; - httpDebug("context:%p, fd:%d, ip:%s, Accept-Encoding:identity", pContext, pContext->fd, pContext->ipstr); + httpTrace("context:%p, fd:%d, ip:%s, Accept-Encoding:identity", pContext, pContext->fd, pContext->ipstr); } } else if (strncasecmp(pParser->pLast, "Content-Encoding: ", 18) == 0) { if (strstr(pParser->pLast + 18, "gzip") != NULL) { pContext->contentEncoding = HTTP_COMPRESS_GZIP; - httpDebug("context:%p, fd:%d, ip:%s, Content-Encoding:gzip", pContext, pContext->fd, pContext->ipstr); + httpTrace("context:%p, fd:%d, ip:%s, Content-Encoding:gzip", pContext, pContext->fd, pContext->ipstr); } else { pContext->contentEncoding = HTTP_COMPRESS_IDENTITY; - httpDebug("context:%p, fd:%d, ip:%s, Content-Encoding:identity", pContext, pContext->fd, pContext->ipstr); + httpTrace("context:%p, fd:%d, ip:%s, Content-Encoding:identity", pContext, pContext->fd, pContext->ipstr); } } else if (strncasecmp(pParser->pLast, "Connection: ", 12) == 0) { if (strncasecmp(pParser->pLast + 12, "Keep-Alive", 10) == 0) { @@ -210,7 +210,7 @@ bool httpParseHead(HttpContext* pContext) { } else { pContext->httpKeepAlive = HTTP_KEEPALIVE_DISABLE; } - httpDebug("context:%p, fd:%d, ip:%s, keepAlive:%d", pContext, pContext->fd, pContext->ipstr, + httpTrace("context:%p, fd:%d, ip:%s, keepAlive:%d", pContext, pContext->fd, pContext->ipstr, pContext->httpKeepAlive); } else if (strncasecmp(pParser->pLast, "Transfer-Encoding: ", 19) == 0) { if (strncasecmp(pParser->pLast + 19, "chunked", 7) == 0) { @@ -281,7 +281,7 @@ bool httpReadChunkedBody(HttpContext* pContext, HttpParser* pParser) { httpParseChunkedBody(pContext, pParser, false); return HTTP_CHECK_BODY_SUCCESS; } else { - httpDebug("context:%p, fd:%d, ip:%s, chunked body not finished, continue read", pContext, pContext->fd, pContext->ipstr); + httpTrace("context:%p, fd:%d, ip:%s, chunked body not finished, continue read", pContext, pContext->fd, pContext->ipstr); if (!httpReadDataImp(pContext)) { httpError("context:%p, fd:%d, ip:%s, read chunked request error", pContext, pContext->fd, pContext->ipstr); return HTTP_CHECK_BODY_ERROR; @@ -299,7 +299,7 @@ int httpReadUnChunkedBody(HttpContext* pContext, HttpParser* pParser) { httpSendErrorResp(pContext, HTTP_PARSE_BODY_ERROR); return HTTP_CHECK_BODY_ERROR; } else if (dataReadLen < pParser->data.len) { - httpDebug("context:%p, fd:%d, ip:%s, un-chunked body not finished, read size:%d dataReadLen:%d < pContext->data.len:%d, continue read", + httpTrace("context:%p, fd:%d, ip:%s, un-chunked body not finished, read size:%d dataReadLen:%d < pContext->data.len:%d, continue read", pContext, pContext->fd, pContext->ipstr, pContext->parser.bufsize, dataReadLen, pParser->data.len); return HTTP_CHECK_BODY_CONTINUE; } else { @@ -313,9 +313,9 @@ bool httpParseRequest(HttpContext* pContext) { return true; } - httpDebug("context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, read size:%d, raw data:\n%s", - pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->pThread->numOfFds, - pContext->parser.bufsize, pContext->parser.buffer); + httpTraceDump("context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, read size:%d, raw data:\n%s", pContext, pContext->fd, + pContext->ipstr, pContext->pThread->label, pContext->pThread->numOfFds, pContext->parser.bufsize, + pContext->parser.buffer); if (!httpGetHttpMethod(pContext)) { return false; diff --git a/src/plugins/http/src/httpJson.c b/src/plugins/http/src/httpJson.c index 319d87d49690e9ca863453191a84b721f768cfd5..9276637d0ee99001d3ec2fc9f50626398a12ee24 100644 --- a/src/plugins/http/src/httpJson.c +++ b/src/plugins/http/src/httpJson.c @@ -76,8 +76,8 @@ int httpWriteBuf(struct HttpContext *pContext, const char *buf, int sz) { httpError("context:%p, fd:%d, ip:%s, dataSize:%d, writeSize:%d, failed to send response:\n%s", pContext, pContext->fd, pContext->ipstr, sz, writeSz, buf); } else { - httpDebug("context:%p, fd:%d, ip:%s, dataSize:%d, writeSize:%d, response:\n%s", - pContext, pContext->fd, pContext->ipstr, sz, writeSz, buf); + httpTrace("context:%p, fd:%d, ip:%s, dataSize:%d, writeSize:%d, response:\n%s", pContext, pContext->fd, + pContext->ipstr, sz, writeSz, buf); } return writeSz; @@ -99,7 +99,7 @@ int httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) { uint64_t srcLen = (uint64_t) (buf->lst - buf->buf); if (buf->pContext->fd <= 0) { - httpDebug("context:%p, fd:%d, ip:%s, write json body error", buf->pContext, buf->pContext->fd, buf->pContext->ipstr); + httpTrace("context:%p, fd:%d, ip:%s, write json body error", buf->pContext, buf->pContext->fd, buf->pContext->ipstr); buf->pContext->fd = -1; } @@ -113,11 +113,11 @@ int httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) { if (buf->pContext->acceptEncoding == HTTP_COMPRESS_IDENTITY) { if (buf->lst == buf->buf) { - httpDebug("context:%p, fd:%d, ip:%s, no data need dump", buf->pContext, buf->pContext->fd, buf->pContext->ipstr); + httpTrace("context:%p, fd:%d, ip:%s, no data need dump", buf->pContext, buf->pContext->fd, buf->pContext->ipstr); return 0; // there is no data to dump. } else { int len = sprintf(sLen, "%lx\r\n", srcLen); - httpDebug("context:%p, fd:%d, ip:%s, write body, chunkSize:%" PRIu64 ", response:\n%s", + httpTrace("context:%p, fd:%d, ip:%s, write body, chunkSize:%" PRIu64 ", response:\n%s", buf->pContext, buf->pContext->fd, buf->pContext->ipstr, srcLen, buf->buf); httpWriteBufNoTrace(buf->pContext, sLen, len); remain = httpWriteBufNoTrace(buf->pContext, buf->buf, (int) srcLen); @@ -129,12 +129,12 @@ int httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) { if (ret == 0) { if (compressBufLen > 0) { int len = sprintf(sLen, "%x\r\n", compressBufLen); - httpDebug("context:%p, fd:%d, ip:%s, write body, chunkSize:%" PRIu64 ", compressSize:%d, last:%d, response:\n%s", + httpTrace("context:%p, fd:%d, ip:%s, write body, chunkSize:%" PRIu64 ", compressSize:%d, last:%d, response:\n%s", buf->pContext, buf->pContext->fd, buf->pContext->ipstr, srcLen, compressBufLen, isTheLast, buf->buf); httpWriteBufNoTrace(buf->pContext, sLen, len); remain = httpWriteBufNoTrace(buf->pContext, (const char *) compressBuf, (int) compressBufLen); } else { - httpDebug("context:%p, fd:%d, ip:%s, last:%d, compress already dumped, response:\n%s", + httpTrace("context:%p, fd:%d, ip:%s, last:%d, compress already dumped, response:\n%s", buf->pContext, buf->pContext->fd, buf->pContext->ipstr, isTheLast, buf->buf); return 0; // there is no data to dump. } @@ -173,7 +173,7 @@ void httpWriteJsonBufHead(JsonBuf* buf) { void httpWriteJsonBufEnd(JsonBuf* buf) { if (buf->pContext->fd <= 0) { - httpDebug("context:%p, fd:%d, ip:%s, json buf fd is 0", buf->pContext, buf->pContext->fd, buf->pContext->ipstr); + httpTrace("context:%p, fd:%d, ip:%s, json buf fd is 0", buf->pContext, buf->pContext->fd, buf->pContext->ipstr); buf->pContext->fd = -1; } diff --git a/src/plugins/http/src/httpServer.c b/src/plugins/http/src/httpServer.c index cef0e806903244bb90e7c53062101ee25fd661b9..6c82386d8187410544219456368be0b399928207 100644 --- a/src/plugins/http/src/httpServer.c +++ b/src/plugins/http/src/httpServer.c @@ -66,8 +66,6 @@ void httpCleanUpConnect() { } } - tfree(pServer->pThreads); - pServer->pThreads = NULL; httpDebug("http server:%s is cleaned up", pServer->label); } diff --git a/src/plugins/http/src/httpSystem.c b/src/plugins/http/src/httpSystem.c index a93e7cd7ad0147b831ca5f8e961a0a53f6da56d2..3a0998f2e8ee4d03ee7db7c083cf79d63acfaeb3 100644 --- a/src/plugins/http/src/httpSystem.c +++ b/src/plugins/http/src/httpSystem.c @@ -95,11 +95,13 @@ void httpCleanUpSystem() { httpInfo("http server cleanup"); httpStopSystem(); + httpCleanUpConnect(); httpCleanupContexts(); httpCleanUpSessions(); - httpCleanUpConnect(); pthread_mutex_destroy(&tsHttpServer.serverMutex); - + tfree(tsHttpServer.pThreads); + tsHttpServer.pThreads = NULL; + tsHttpServer.status = HTTP_SERVER_CLOSED; } diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index ebc83d0cc268152ef4ca6132ff1cb3de89d1140d..9c9ac1699a38f1545de273b41792ecb32677b812 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -233,26 +233,10 @@ STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) { if (tsdbTableSetSName(pCfg, pMsg->superTableId, true) < 0) goto _err; if (tsdbTableSetSuperUid(pCfg, htobe64(pMsg->superTableUid)) < 0) goto _err; - // Decode tag values - if (pMsg->tagDataLen) { - int accBytes = 0; + int32_t tagDataLen = htonl(pMsg->tagDataLen); + if (tagDataLen) { char *pTagData = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema); - - SKVRowBuilder kvRowBuilder = {0}; - if (tdInitKVRowBuilder(&kvRowBuilder) < 0) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; - } - for (int i = numOfCols; i < numOfCols + numOfTags; i++) { - if (tdAddColToKVRow(&kvRowBuilder, htons(pSchema[i].colId), pSchema[i].type, pTagData + accBytes) < 0) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; - } - accBytes += htons(pSchema[i].bytes); - } - - tsdbTableSetTagValue(pCfg, tdGetKVRowFromBuilder(&kvRowBuilder), false); - tdDestroyKVRowBuilder(&kvRowBuilder); + tsdbTableSetTagValue(pCfg, pTagData, true); } } @@ -620,6 +604,10 @@ static char *getTagIndexKey(const void *pData) { STSchema *pSchema = tsdbGetTableTagSchema(pTable); STColumn *pCol = schemaColAt(pSchema, DEFAULT_TAG_INDEX_COLUMN); void * res = tdGetKVRowValOfCol(pTable->tagVal, pCol->colId); + if (res == NULL) { + // treat the column as NULL if we cannot find it + res = getNullValue(pCol->type); + } return res; } diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 2e57ad83aee0a59246bbe6ec25012daecbe916d4..720741f0893086d4c9c3fb59bee0adfe4dc3f534 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -119,7 +119,7 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo int32_t size = pNode->size; taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize); - uDebug("key:%s is removed from cache,total:%" PRId64 ",size:%dbytes", pNode->key, pCacheObj->totalSize, size); + uDebug("key:%s, is removed from cache, total:%" PRId64 " size:%d bytes", pNode->key, pCacheObj->totalSize, size); if (pCacheObj->freeFp) pCacheObj->freeFp(pNode->data); free(pNode); } @@ -288,14 +288,14 @@ void *taosCachePut(SCacheObj *pCacheObj, const char *key, const void *pData, siz if (NULL != pNode) { pCacheObj->totalSize += pNode->size; - uDebug("key:%s %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", total:%" PRId64 ", size:%" PRId64 " bytes", + uDebug("key:%s, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", total:%" PRId64 ", size:%" PRId64 " bytes", key, pNode, pNode->addedTime, pNode->expiredTime, pCacheObj->totalSize, dataSize); } else { - uError("key:%s failed to added into cache, out of memory", key); + uError("key:%s, failed to added into cache, out of memory", key); } } else { // old data exists, update the node pNode = taosUpdateCacheImpl(pCacheObj, pOld, key, keyLen, pData, dataSize, duration * 1000L); - uDebug("key:%s %p exist in cache, updated", key, pNode); + uDebug("key:%s, %p exist in cache, updated", key, pNode); } __cache_unlock(pCacheObj); @@ -321,10 +321,10 @@ void *taosCacheAcquireByName(SCacheObj *pCacheObj, const char *key) { if (ptNode != NULL) { atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1); - uDebug("key:%s is retrieved from cache, %p refcnt:%d", key, (*ptNode), T_REF_VAL_GET(*ptNode)); + uDebug("key:%s, is retrieved from cache, %p refcnt:%d", key, (*ptNode), T_REF_VAL_GET(*ptNode)); } else { atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1); - uDebug("key:%s not in cache, retrieved failed", key); + uDebug("key:%s, not in cache, retrieved failed", key); } atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1); @@ -350,10 +350,10 @@ void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, const char *key, uin if (ptNode != NULL) { atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1); - uDebug("key:%s expireTime is updated in cache, %p refcnt:%d", key, (*ptNode), T_REF_VAL_GET(*ptNode)); + uDebug("key:%s, expireTime is updated in cache, %p refcnt:%d", key, (*ptNode), T_REF_VAL_GET(*ptNode)); } else { atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1); - uDebug("key:%s not in cache, retrieved failed", key); + uDebug("key:%s, not in cache, retrieved failed", key); } atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1); @@ -410,13 +410,13 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { SCacheDataNode *pNode = (SCacheDataNode *)((char *)(*data) - offset); if (pNode->signature != (uint64_t)pNode) { - uError("key: %p release invalid cache data", pNode); + uError("%p release invalid cache data", pNode); return; } *data = NULL; int32_t ref = T_REF_DEC(pNode); - uDebug("%p data released, refcnt:%d", pNode, ref); + uDebug("key:%s, is released, %p refcnt:%d", pNode->key, pNode, ref); if (_remove) { __cache_wr_lock(pCacheObj); @@ -501,7 +501,7 @@ void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) { pNode->inTrashCan = true; pCacheObj->numOfElemsInTrash++; - uDebug("key:%s %p move to trash, numOfElem in trash:%d", pNode->key, pNode, pCacheObj->numOfElemsInTrash); + uDebug("key:%s, %p move to trash, numOfElem in trash:%d", pNode->key, pNode, pCacheObj->numOfElemsInTrash); } void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem) { @@ -549,7 +549,7 @@ void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) { } if (force || (T_REF_VAL_GET(pElem->pData) == 0)) { - uDebug("key:%s %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData, + uDebug("key:%s, %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData, pCacheObj->numOfElemsInTrash - 1); STrashElem *p = pElem; @@ -570,8 +570,11 @@ void doCleanupDataCache(SCacheObj *pCacheObj) { while (taosHashIterNext(pIter)) { SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter); // if (pNode->expiredTime <= expiredTime && T_REF_VAL_GET(pNode) <= 0) { - taosCacheReleaseNode(pCacheObj, pNode); - //} + if (T_REF_VAL_GET(pNode) <= 0) { + taosCacheReleaseNode(pCacheObj, pNode); + } else { + uDebug("key:%s, will not remove from cache, refcnt:%d", pNode->key, T_REF_VAL_GET(pNode)); + } } taosHashDestroyIter(pIter);