未验证 提交 839b8a2e 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2519 from taosdata/bugfix/td-803

Bugfix/td 803
......@@ -891,11 +891,15 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
return tscInvalidSQLErrMsg(pCmd->payload, "keyword TAGS expected", sToken.z);
}
SKVRowBuilder kvRowBuilder = {0};
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
uint32_t ignoreTokenTypes = TK_LP;
uint32_t numOfIgnoreToken = 1;
for (int i = 0; i < spd.numOfAssignedCols; ++i) {
char * tagVal = pTag->data + spd.elems[i].offset;
int16_t colIndex = spd.elems[i].colIndex;
SSchema* pSchema = pTagSchema + spd.elems[i].colIndex;
index = 0;
sToken = tStrGetToken(sql, &index, true, numOfIgnoreToken, &ignoreTokenTypes);
......@@ -911,12 +915,26 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
sToken.n -= 2;
}
code = tsParseOneColumnData(&pTagSchema[colIndex], &sToken, tagVal, pCmd->payload, &sql, false, tinfo.precision);
char tagVal[TSDB_MAX_TAGS_LEN];
code = tsParseOneColumnData(pSchema, &sToken, tagVal, pCmd->payload, &sql, false, tinfo.precision);
if (code != TSDB_CODE_SUCCESS) {
tdDestroyKVRowBuilder(&kvRowBuilder);
return code;
}
tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal);
}
SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
tdDestroyKVRowBuilder(&kvRowBuilder);
if (row == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
tdSortKVRowByColIdx(row);
pTag->dataLen = kvRowLen(row);
kvRowCpy(pTag->data, row);
free(row);
index = 0;
sToken = tStrGetToken(sql, &index, false, 0, NULL);
sql += index;
......@@ -924,29 +942,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
return tscInvalidSQLErrMsg(pCmd->payload, ") expected", sToken.z);
}
// 2. set the null value for the columns that do not assign values
if (spd.numOfAssignedCols < spd.numOfCols) {
char *ptr = pTag->data;
for (int32_t i = 0; i < spd.numOfCols; ++i) {
if (!spd.hasVal[i]) { // current tag column do not have any value to insert, set it to null
if (pTagSchema[i].type == TSDB_DATA_TYPE_BINARY || pTagSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(ptr, pTagSchema[i].type);
} else {
setNull(ptr, pTagSchema[i].type, pTagSchema[i].bytes);
}
}
ptr += pTagSchema[i].bytes;
}
}
// 3. calculate the actual data size of STagData
pCmd->payloadLen = sizeof(pTag->name) + sizeof(pTag->dataLen);
for (int32_t t = 0; t < numOfTags; ++t) {
pTag->dataLen += pTagSchema[t].bytes;
pCmd->payloadLen += pTagSchema[t].bytes;
}
pCmd->payloadLen = sizeof(pTag->name) + sizeof(pTag->dataLen) + pTag->dataLen;
pTag->dataLen = htonl(pTag->dataLen);
if (tscValidateName(&tableToken) != TSDB_CODE_SUCCESS) {
......
......@@ -5623,24 +5623,41 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
SSchema* pTagSchema = tscGetTableTagSchema(pStableMeterMetaInfo->pTableMeta);
STagData* pTag = &pCreateTable->usingInfo.tagdata;
char* tagVal = pTag->data;
SKVRowBuilder kvRowBuilder = {0};
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
int32_t ret = TSDB_CODE_SUCCESS;
for (int32_t i = 0; i < pList->nExpr; ++i) {
if (pTagSchema[i].type == TSDB_DATA_TYPE_BINARY || pTagSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
SSchema* pSchema = pTagSchema + i;
if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) {
// validate the length of binary
if (pList->a[i].pVar.nLen + VARSTR_HEADER_SIZE > pTagSchema[i].bytes) {
if (pList->a[i].pVar.nLen + VARSTR_HEADER_SIZE > pSchema->bytes) {
tdDestroyKVRowBuilder(&kvRowBuilder);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
}
ret = tVariantDump(&(pList->a[i].pVar), tagVal, pTagSchema[i].type, true);
char tagVal[TSDB_MAX_TAGS_LEN];
ret = tVariantDump(&(pList->a[i].pVar), tagVal, pSchema->type, true);
if (ret != TSDB_CODE_SUCCESS) {
tdDestroyKVRowBuilder(&kvRowBuilder);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
}
tagVal += pTagSchema[i].bytes;
tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal);
}
SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
tdDestroyKVRowBuilder(&kvRowBuilder);
if (row == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
tdSortKVRowByColIdx(row);
pTag->dataLen = kvRowLen(row);
kvRowCpy(pTag->data, row);
free(row);
// table name
if (tscValidateName(&pInfo->pCreateTableInfo->name) != TSDB_CODE_SUCCESS) {
......@@ -5653,7 +5670,6 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
return ret;
}
pTag->dataLen = tagVal - pTag->data;
return TSDB_CODE_SUCCESS;
}
......
......@@ -625,18 +625,31 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo
return len;
}
static int32_t getRowExpandSize(STableMeta* pTableMeta) {
int32_t result = TD_DATA_ROW_HEAD_SIZE;
int32_t columns = tscGetNumOfColumns(pTableMeta);
SSchema* pSchema = tscGetTableSchema(pTableMeta);
for(int32_t i = 0; i < columns; i++) {
if (IS_VAR_DATA_TYPE((pSchema + i)->type)) {
result += TYPE_BYTES[TSDB_DATA_TYPE_BINARY];
}
}
return result;
}
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
SSqlCmd* pCmd = &pSql->cmd;
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format
const int32_t MAX_EXPAND_SIZE = TD_DATA_ROW_HEAD_SIZE + TYPE_BYTES[TSDB_DATA_TYPE_BINARY];
STableDataBlocks* pOneTableBlock = taosArrayGetP(pTableDataBlockList, 0);
int32_t expandSize = getRowExpandSize(pOneTableBlock->pTableMeta);
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
size_t total = taosArrayGetSize(pTableDataBlockList);
for (int32_t i = 0; i < total; ++i) {
STableDataBlocks* pOneTableBlock = taosArrayGetP(pTableDataBlockList, i);
pOneTableBlock = taosArrayGetP(pTableDataBlockList, i);
STableDataBlocks* dataBuf = NULL;
int32_t ret =
......@@ -650,7 +663,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
}
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * MAX_EXPAND_SIZE;
int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize;
if (dataBuf->nAllocSize < destSize) {
while (dataBuf->nAllocSize < destSize) {
......@@ -678,8 +691,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
tscDebug("%p tableId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableId,
pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(ekey));
int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + MAX_EXPAND_SIZE);
int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + expandSize);
pBlocks->tid = htonl(pBlocks->tid);
pBlocks->uid = htobe64(pBlocks->uid);
......
......@@ -272,7 +272,7 @@ typedef struct {
int16_t offset;
} SColIdx;
#define TD_KV_ROW_HEAD_SIZE 2 * sizeof(int16_t)
#define TD_KV_ROW_HEAD_SIZE (2 * sizeof(int16_t))
#define kvRowLen(r) (*(int16_t *)(r))
#define kvRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(int16_t)))
......@@ -290,6 +290,7 @@ SKVRow tdKVRowDup(SKVRow row);
int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value);
int tdEncodeKVRow(void **buf, SKVRow row);
void * tdDecodeKVRow(void *buf, SKVRow *row);
void tdSortKVRowByColIdx(SKVRow row);
static FORCE_INLINE int comparTagId(const void *key1, const void *key2) {
if (*(int16_t *)key1 > ((SColIdx *)key2)->colId) {
......
......@@ -515,6 +515,22 @@ SKVRow tdKVRowDup(SKVRow row) {
return trow;
}
static int compareColIdx(const void* a, const void* b) {
const SColIdx* x = (const SColIdx*)a;
const SColIdx* y = (const SColIdx*)b;
if (x->colId > y->colId) {
return 1;
}
if (x->colId < y->colId) {
return -1;
}
return 0;
}
void tdSortKVRowByColIdx(SKVRow row) {
qsort(kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), compareColIdx);
}
int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) {
SColIdx *pColIdx = NULL;
SKVRow row = *orow;
......
......@@ -464,6 +464,29 @@ void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems) {
}
}
static uint8_t nullBool = TSDB_DATA_BOOL_NULL;
static uint8_t nullTinyInt = TSDB_DATA_TINYINT_NULL;
static uint16_t nullSmallInt = TSDB_DATA_SMALLINT_NULL;
static uint32_t nullInt = TSDB_DATA_INT_NULL;
static uint64_t nullBigInt = TSDB_DATA_BIGINT_NULL;
static uint32_t nullFloat = TSDB_DATA_FLOAT_NULL;
static uint64_t nullDouble = TSDB_DATA_DOUBLE_NULL;
static union {
tstr str;
char pad[sizeof(tstr) + 4];
} nullBinary = {.str = {.len = 1}}, nullNchar = {.str = {.len = 4}};
static void *nullValues[] = {
&nullBool, &nullTinyInt, &nullSmallInt, &nullInt, &nullBigInt,
&nullFloat, &nullDouble, &nullBinary, &nullBigInt, &nullNchar,
};
void *getNullValue(int32_t type) {
assert(type >= TSDB_DATA_TYPE_BOOL && type <= TSDB_DATA_TYPE_NCHAR);
return nullValues[type - 1];
}
void assignVal(char *val, const char *src, int32_t len, int32_t type) {
switch (type) {
case TSDB_DATA_TYPE_INT: {
......
......@@ -256,30 +256,13 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
SDataRow trow = (SDataRow)pBlk->data;
tdInitDataRow(trow, pSchema);
union {
char buf[sizeof(int64_t)];
tstr str;
} nullVal;
for (int32_t i = 0; i < pSchema->numOfCols; i++) {
STColumn *c = pSchema->columns + i;
char* val = (char*)row[i];
if (IS_VAR_DATA_TYPE(c->type)) {
if (val == NULL) {
val = nullVal.buf;
if (c->type == TSDB_DATA_TYPE_BINARY) {
setNull(nullVal.str.data, TSDB_DATA_TYPE_BINARY, 1);
nullVal.str.len = 1;
} else {
setNull(nullVal.str.data, TSDB_DATA_TYPE_NCHAR, 4);
nullVal.str.len = 4;
}
} else {
val -= sizeof(VarDataLenT);
}
} else if (val == NULL) {
val = nullVal.buf;
setNull(val, c->type, c->bytes);
void* val = row[i];
if (val == NULL) {
val = getNullValue(c->type);
} else if (IS_VAR_DATA_TYPE(c->type)) {
val = ((char*)val) - sizeof(VarDataLenT);
}
tdAppendColVal(trow, val, c->type, c->bytes, c->offset);
}
......
......@@ -165,6 +165,7 @@ bool isNull(const char *val, int32_t type);
void setVardataNull(char* val, int32_t type);
void setNull(char *val, int32_t type, int32_t bytes);
void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems);
void* getNullValue(int32_t type);
void assignVal(char *val, const char *src, int32_t len, int32_t type);
void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
......
......@@ -26,6 +26,7 @@ extern "C" {
#include "taosdef.h"
#include "taoserror.h"
#include "trpc.h"
#include "tdataformat.h"
// message type
......@@ -674,7 +675,7 @@ typedef struct SMultiTableMeta {
typedef struct {
int32_t dataLen;
char name[TSDB_TABLE_ID_LEN];
char data[TSDB_MAX_TAGS_LEN];
char data[TSDB_MAX_TAGS_LEN + TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * TSDB_MAX_TAGS];
} STagData;
/*
......
......@@ -141,10 +141,15 @@ HttpContext *httpGetContext(void *ptr) {
void httpReleaseContext(HttpContext *pContext) {
int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1);
assert(refCount >= 0);
httpDebug("context:%p, fd:%d, is releasd, refCount:%d", pContext, pContext->fd, refCount);
httpDebug("context:%p, is releasd, refCount:%d", pContext, refCount);
HttpContext **ppContext = pContext->ppContext;
taosCacheRelease(tsHttpServer.contextCache, (void **)(&ppContext), false);
if (tsHttpServer.contextCache != NULL) {
taosCacheRelease(tsHttpServer.contextCache, (void **)(&ppContext), false);
} else {
httpDebug("context:%p, won't be destroyed for cache is already released", pContext);
// httpDestroyContext((void **)(&ppContext));
}
}
bool httpInitContext(HttpContext *pContext) {
......
......@@ -157,7 +157,7 @@ bool httpGetHttpMethod(HttpContext* pContext) {
pParser->method.pos[pParser->method.len] = 0;
pParser->pLast = pSeek + 1;
httpDebug("context:%p, fd:%d, ip:%s, httpMethod:%s", pContext, pContext->fd, pContext->ipstr, pParser->method.pos);
httpTrace("context:%p, fd:%d, ip:%s, httpMethod:%s", pContext, pContext->fd, pContext->ipstr, pParser->method.pos);
return true;
}
......@@ -186,23 +186,23 @@ bool httpParseHead(HttpContext* pContext) {
HttpParser* pParser = &pContext->parser;
if (strncasecmp(pParser->pLast, "Content-Length: ", 16) == 0) {
pParser->data.len = (int32_t)atoi(pParser->pLast + 16);
httpDebug("context:%p, fd:%d, ip:%s, Content-Length:%d", pContext, pContext->fd, pContext->ipstr,
httpTrace("context:%p, fd:%d, ip:%s, Content-Length:%d", pContext, pContext->fd, pContext->ipstr,
pParser->data.len);
} else if (strncasecmp(pParser->pLast, "Accept-Encoding: ", 17) == 0) {
if (tsHttpEnableCompress && strstr(pParser->pLast + 17, "gzip") != NULL) {
pContext->acceptEncoding = HTTP_COMPRESS_GZIP;
httpDebug("context:%p, fd:%d, ip:%s, Accept-Encoding:gzip", pContext, pContext->fd, pContext->ipstr);
httpTrace("context:%p, fd:%d, ip:%s, Accept-Encoding:gzip", pContext, pContext->fd, pContext->ipstr);
} else {
pContext->acceptEncoding = HTTP_COMPRESS_IDENTITY;
httpDebug("context:%p, fd:%d, ip:%s, Accept-Encoding:identity", pContext, pContext->fd, pContext->ipstr);
httpTrace("context:%p, fd:%d, ip:%s, Accept-Encoding:identity", pContext, pContext->fd, pContext->ipstr);
}
} else if (strncasecmp(pParser->pLast, "Content-Encoding: ", 18) == 0) {
if (strstr(pParser->pLast + 18, "gzip") != NULL) {
pContext->contentEncoding = HTTP_COMPRESS_GZIP;
httpDebug("context:%p, fd:%d, ip:%s, Content-Encoding:gzip", pContext, pContext->fd, pContext->ipstr);
httpTrace("context:%p, fd:%d, ip:%s, Content-Encoding:gzip", pContext, pContext->fd, pContext->ipstr);
} else {
pContext->contentEncoding = HTTP_COMPRESS_IDENTITY;
httpDebug("context:%p, fd:%d, ip:%s, Content-Encoding:identity", pContext, pContext->fd, pContext->ipstr);
httpTrace("context:%p, fd:%d, ip:%s, Content-Encoding:identity", pContext, pContext->fd, pContext->ipstr);
}
} else if (strncasecmp(pParser->pLast, "Connection: ", 12) == 0) {
if (strncasecmp(pParser->pLast + 12, "Keep-Alive", 10) == 0) {
......@@ -210,7 +210,7 @@ bool httpParseHead(HttpContext* pContext) {
} else {
pContext->httpKeepAlive = HTTP_KEEPALIVE_DISABLE;
}
httpDebug("context:%p, fd:%d, ip:%s, keepAlive:%d", pContext, pContext->fd, pContext->ipstr,
httpTrace("context:%p, fd:%d, ip:%s, keepAlive:%d", pContext, pContext->fd, pContext->ipstr,
pContext->httpKeepAlive);
} else if (strncasecmp(pParser->pLast, "Transfer-Encoding: ", 19) == 0) {
if (strncasecmp(pParser->pLast + 19, "chunked", 7) == 0) {
......@@ -281,7 +281,7 @@ bool httpReadChunkedBody(HttpContext* pContext, HttpParser* pParser) {
httpParseChunkedBody(pContext, pParser, false);
return HTTP_CHECK_BODY_SUCCESS;
} else {
httpDebug("context:%p, fd:%d, ip:%s, chunked body not finished, continue read", pContext, pContext->fd, pContext->ipstr);
httpTrace("context:%p, fd:%d, ip:%s, chunked body not finished, continue read", pContext, pContext->fd, pContext->ipstr);
if (!httpReadDataImp(pContext)) {
httpError("context:%p, fd:%d, ip:%s, read chunked request error", pContext, pContext->fd, pContext->ipstr);
return HTTP_CHECK_BODY_ERROR;
......@@ -299,7 +299,7 @@ int httpReadUnChunkedBody(HttpContext* pContext, HttpParser* pParser) {
httpSendErrorResp(pContext, HTTP_PARSE_BODY_ERROR);
return HTTP_CHECK_BODY_ERROR;
} else if (dataReadLen < pParser->data.len) {
httpDebug("context:%p, fd:%d, ip:%s, un-chunked body not finished, read size:%d dataReadLen:%d < pContext->data.len:%d, continue read",
httpTrace("context:%p, fd:%d, ip:%s, un-chunked body not finished, read size:%d dataReadLen:%d < pContext->data.len:%d, continue read",
pContext, pContext->fd, pContext->ipstr, pContext->parser.bufsize, dataReadLen, pParser->data.len);
return HTTP_CHECK_BODY_CONTINUE;
} else {
......@@ -313,9 +313,9 @@ bool httpParseRequest(HttpContext* pContext) {
return true;
}
httpDebug("context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, read size:%d, raw data:\n%s",
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->pThread->numOfFds,
pContext->parser.bufsize, pContext->parser.buffer);
httpTraceDump("context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, read size:%d, raw data:\n%s", pContext, pContext->fd,
pContext->ipstr, pContext->pThread->label, pContext->pThread->numOfFds, pContext->parser.bufsize,
pContext->parser.buffer);
if (!httpGetHttpMethod(pContext)) {
return false;
......
......@@ -76,8 +76,8 @@ int httpWriteBuf(struct HttpContext *pContext, const char *buf, int sz) {
httpError("context:%p, fd:%d, ip:%s, dataSize:%d, writeSize:%d, failed to send response:\n%s",
pContext, pContext->fd, pContext->ipstr, sz, writeSz, buf);
} else {
httpDebug("context:%p, fd:%d, ip:%s, dataSize:%d, writeSize:%d, response:\n%s",
pContext, pContext->fd, pContext->ipstr, sz, writeSz, buf);
httpTrace("context:%p, fd:%d, ip:%s, dataSize:%d, writeSize:%d, response:\n%s", pContext, pContext->fd,
pContext->ipstr, sz, writeSz, buf);
}
return writeSz;
......@@ -99,7 +99,7 @@ int httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) {
uint64_t srcLen = (uint64_t) (buf->lst - buf->buf);
if (buf->pContext->fd <= 0) {
httpDebug("context:%p, fd:%d, ip:%s, write json body error", buf->pContext, buf->pContext->fd, buf->pContext->ipstr);
httpTrace("context:%p, fd:%d, ip:%s, write json body error", buf->pContext, buf->pContext->fd, buf->pContext->ipstr);
buf->pContext->fd = -1;
}
......@@ -113,11 +113,11 @@ int httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) {
if (buf->pContext->acceptEncoding == HTTP_COMPRESS_IDENTITY) {
if (buf->lst == buf->buf) {
httpDebug("context:%p, fd:%d, ip:%s, no data need dump", buf->pContext, buf->pContext->fd, buf->pContext->ipstr);
httpTrace("context:%p, fd:%d, ip:%s, no data need dump", buf->pContext, buf->pContext->fd, buf->pContext->ipstr);
return 0; // there is no data to dump.
} else {
int len = sprintf(sLen, "%lx\r\n", srcLen);
httpDebug("context:%p, fd:%d, ip:%s, write body, chunkSize:%" PRIu64 ", response:\n%s",
httpTrace("context:%p, fd:%d, ip:%s, write body, chunkSize:%" PRIu64 ", response:\n%s",
buf->pContext, buf->pContext->fd, buf->pContext->ipstr, srcLen, buf->buf);
httpWriteBufNoTrace(buf->pContext, sLen, len);
remain = httpWriteBufNoTrace(buf->pContext, buf->buf, (int) srcLen);
......@@ -129,12 +129,12 @@ int httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) {
if (ret == 0) {
if (compressBufLen > 0) {
int len = sprintf(sLen, "%x\r\n", compressBufLen);
httpDebug("context:%p, fd:%d, ip:%s, write body, chunkSize:%" PRIu64 ", compressSize:%d, last:%d, response:\n%s",
httpTrace("context:%p, fd:%d, ip:%s, write body, chunkSize:%" PRIu64 ", compressSize:%d, last:%d, response:\n%s",
buf->pContext, buf->pContext->fd, buf->pContext->ipstr, srcLen, compressBufLen, isTheLast, buf->buf);
httpWriteBufNoTrace(buf->pContext, sLen, len);
remain = httpWriteBufNoTrace(buf->pContext, (const char *) compressBuf, (int) compressBufLen);
} else {
httpDebug("context:%p, fd:%d, ip:%s, last:%d, compress already dumped, response:\n%s",
httpTrace("context:%p, fd:%d, ip:%s, last:%d, compress already dumped, response:\n%s",
buf->pContext, buf->pContext->fd, buf->pContext->ipstr, isTheLast, buf->buf);
return 0; // there is no data to dump.
}
......@@ -173,7 +173,7 @@ void httpWriteJsonBufHead(JsonBuf* buf) {
void httpWriteJsonBufEnd(JsonBuf* buf) {
if (buf->pContext->fd <= 0) {
httpDebug("context:%p, fd:%d, ip:%s, json buf fd is 0", buf->pContext, buf->pContext->fd, buf->pContext->ipstr);
httpTrace("context:%p, fd:%d, ip:%s, json buf fd is 0", buf->pContext, buf->pContext->fd, buf->pContext->ipstr);
buf->pContext->fd = -1;
}
......
......@@ -66,8 +66,6 @@ void httpCleanUpConnect() {
}
}
tfree(pServer->pThreads);
pServer->pThreads = NULL;
httpDebug("http server:%s is cleaned up", pServer->label);
}
......
......@@ -95,11 +95,13 @@ void httpCleanUpSystem() {
httpInfo("http server cleanup");
httpStopSystem();
httpCleanUpConnect();
httpCleanupContexts();
httpCleanUpSessions();
httpCleanUpConnect();
pthread_mutex_destroy(&tsHttpServer.serverMutex);
tfree(tsHttpServer.pThreads);
tsHttpServer.pThreads = NULL;
tsHttpServer.status = HTTP_SERVER_CLOSED;
}
......
......@@ -233,26 +233,10 @@ STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) {
if (tsdbTableSetSName(pCfg, pMsg->superTableId, true) < 0) goto _err;
if (tsdbTableSetSuperUid(pCfg, htobe64(pMsg->superTableUid)) < 0) goto _err;
// Decode tag values
if (pMsg->tagDataLen) {
int accBytes = 0;
int32_t tagDataLen = htonl(pMsg->tagDataLen);
if (tagDataLen) {
char *pTagData = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema);
SKVRowBuilder kvRowBuilder = {0};
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
for (int i = numOfCols; i < numOfCols + numOfTags; i++) {
if (tdAddColToKVRow(&kvRowBuilder, htons(pSchema[i].colId), pSchema[i].type, pTagData + accBytes) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
accBytes += htons(pSchema[i].bytes);
}
tsdbTableSetTagValue(pCfg, tdGetKVRowFromBuilder(&kvRowBuilder), false);
tdDestroyKVRowBuilder(&kvRowBuilder);
tsdbTableSetTagValue(pCfg, pTagData, true);
}
}
......@@ -620,6 +604,10 @@ static char *getTagIndexKey(const void *pData) {
STSchema *pSchema = tsdbGetTableTagSchema(pTable);
STColumn *pCol = schemaColAt(pSchema, DEFAULT_TAG_INDEX_COLUMN);
void * res = tdGetKVRowValOfCol(pTable->tagVal, pCol->colId);
if (res == NULL) {
// treat the column as NULL if we cannot find it
res = getNullValue(pCol->type);
}
return res;
}
......
......@@ -119,7 +119,7 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo
int32_t size = pNode->size;
taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
uDebug("key:%s is removed from cache,total:%" PRId64 ",size:%dbytes", pNode->key, pCacheObj->totalSize, size);
uDebug("key:%s, is removed from cache, total:%" PRId64 " size:%d bytes", pNode->key, pCacheObj->totalSize, size);
if (pCacheObj->freeFp) pCacheObj->freeFp(pNode->data);
free(pNode);
}
......@@ -288,14 +288,14 @@ void *taosCachePut(SCacheObj *pCacheObj, const char *key, const void *pData, siz
if (NULL != pNode) {
pCacheObj->totalSize += pNode->size;
uDebug("key:%s %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", total:%" PRId64 ", size:%" PRId64 " bytes",
uDebug("key:%s, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", total:%" PRId64 ", size:%" PRId64 " bytes",
key, pNode, pNode->addedTime, pNode->expiredTime, pCacheObj->totalSize, dataSize);
} else {
uError("key:%s failed to added into cache, out of memory", key);
uError("key:%s, failed to added into cache, out of memory", key);
}
} else { // old data exists, update the node
pNode = taosUpdateCacheImpl(pCacheObj, pOld, key, keyLen, pData, dataSize, duration * 1000L);
uDebug("key:%s %p exist in cache, updated", key, pNode);
uDebug("key:%s, %p exist in cache, updated", key, pNode);
}
__cache_unlock(pCacheObj);
......@@ -321,10 +321,10 @@ void *taosCacheAcquireByName(SCacheObj *pCacheObj, const char *key) {
if (ptNode != NULL) {
atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
uDebug("key:%s is retrieved from cache, %p refcnt:%d", key, (*ptNode), T_REF_VAL_GET(*ptNode));
uDebug("key:%s, is retrieved from cache, %p refcnt:%d", key, (*ptNode), T_REF_VAL_GET(*ptNode));
} else {
atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
uDebug("key:%s not in cache, retrieved failed", key);
uDebug("key:%s, not in cache, retrieved failed", key);
}
atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
......@@ -350,10 +350,10 @@ void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, const char *key, uin
if (ptNode != NULL) {
atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
uDebug("key:%s expireTime is updated in cache, %p refcnt:%d", key, (*ptNode), T_REF_VAL_GET(*ptNode));
uDebug("key:%s, expireTime is updated in cache, %p refcnt:%d", key, (*ptNode), T_REF_VAL_GET(*ptNode));
} else {
atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
uDebug("key:%s not in cache, retrieved failed", key);
uDebug("key:%s, not in cache, retrieved failed", key);
}
atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
......@@ -410,13 +410,13 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
SCacheDataNode *pNode = (SCacheDataNode *)((char *)(*data) - offset);
if (pNode->signature != (uint64_t)pNode) {
uError("key: %p release invalid cache data", pNode);
uError("%p release invalid cache data", pNode);
return;
}
*data = NULL;
int32_t ref = T_REF_DEC(pNode);
uDebug("%p data released, refcnt:%d", pNode, ref);
uDebug("key:%s, is released, %p refcnt:%d", pNode->key, pNode, ref);
if (_remove) {
__cache_wr_lock(pCacheObj);
......@@ -501,7 +501,7 @@ void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
pNode->inTrashCan = true;
pCacheObj->numOfElemsInTrash++;
uDebug("key:%s %p move to trash, numOfElem in trash:%d", pNode->key, pNode, pCacheObj->numOfElemsInTrash);
uDebug("key:%s, %p move to trash, numOfElem in trash:%d", pNode->key, pNode, pCacheObj->numOfElemsInTrash);
}
void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem) {
......@@ -549,7 +549,7 @@ void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) {
}
if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
uDebug("key:%s %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData,
uDebug("key:%s, %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData,
pCacheObj->numOfElemsInTrash - 1);
STrashElem *p = pElem;
......@@ -570,8 +570,11 @@ void doCleanupDataCache(SCacheObj *pCacheObj) {
while (taosHashIterNext(pIter)) {
SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
// if (pNode->expiredTime <= expiredTime && T_REF_VAL_GET(pNode) <= 0) {
taosCacheReleaseNode(pCacheObj, pNode);
//}
if (T_REF_VAL_GET(pNode) <= 0) {
taosCacheReleaseNode(pCacheObj, pNode);
} else {
uDebug("key:%s, will not remove from cache, refcnt:%d", pNode->key, T_REF_VAL_GET(pNode));
}
}
taosHashDestroyIter(pIter);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册