diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 3deb4c463fc77e9b7dd8613514d954df96fd2337..5d828d7cf0aaedd7abc20b134b24219212e1e8cb 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -67,7 +67,7 @@ typedef struct SJoinSubquerySupporter { } SJoinSubquerySupporter; void tscDestroyDataBlock(STableDataBlocks* pDataBlock); -STableDataBlocks* tscCreateDataBlock(int32_t size); +STableDataBlocks* tscCreateDataBlock(size_t initialBufSize, int32_t rowSize, int32_t startOffset, const char* name); void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks); SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes, uint32_t offset); @@ -78,9 +78,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDa void tscFreeUnusedDataBlocks(SDataBlockList* pList); int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pDataList); STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size, - int32_t startOffset, int32_t rowSize, char* tableId); -STableDataBlocks* tscCreateDataBlockEx(size_t size, int32_t rowSize, int32_t startOffset, char* name); - + int32_t startOffset, int32_t rowSize, const char* tableId); SVnodeSidList* tscGetVnodeSidList(SMetricMeta* pMetricmeta, int32_t vnodeIdx); SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 4de33f5ac49fee7979393c6f827d3a5a604e5d14..be9ba47f2a7827eecc52d6d8263cd73ad8ffcb8a 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -231,17 +231,22 @@ typedef struct SParamInfo { typedef struct STableDataBlocks { char meterId[TSDB_METER_ID_LEN]; - int8_t tsSource; - bool ordered; + int8_t tsSource; // where does the UNIX timestamp come from, server or client + bool ordered; // if current rows are ordered or not + int64_t vgid; // virtual group id + int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending + int32_t numOfMeters; // number of tables in current submit block - int64_t vgid; - int64_t prevTS; - - int32_t numOfMeters; - - int32_t rowSize; + int32_t rowSize; // row size for current table uint32_t nAllocSize; uint32_t size; + + /* + * the metermeta for current table, the metermeta will be used during submit stage, keep a ref + * to avoid it to be removed from cache + */ + SMeterMeta* pMeterMeta; + union { char *filename; char *pData; @@ -255,8 +260,8 @@ typedef struct STableDataBlocks { typedef struct SDataBlockList { int32_t idx; - int32_t nSize; - int32_t nAlloc; + uint32_t nSize; + uint32_t nAlloc; char * userParam; /* user assigned parameters for async query */ void * udfp; /* user defined function pointer, used in async model */ STableDataBlocks **pData; diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 6317decbe020e52e0b3747b67892d09cca8fb3e4..a07ad6f2117aabf13b3a2d67d45af39426fa419e 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -985,7 +985,7 @@ int doParserInsertSql(SSqlObj *pSql, char *str) { strcpy(fname, full_path.we_wordv[0]); wordfree(&full_path); - STableDataBlocks *pDataBlock = tscCreateDataBlockEx(PATH_MAX, pMeterMetaInfo->pMeterMeta->rowSize, + STableDataBlocks *pDataBlock = tscCreateDataBlock(PATH_MAX, pMeterMetaInfo->pMeterMeta->rowSize, sizeof(SShellSubmitBlock), pMeterMetaInfo->name); tscAppendDataBlock(pCmd->pDataBlocks, pDataBlock); @@ -1222,8 +1222,8 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) { int32_t rowSize = pMeterMeta->rowSize; pCmd->pDataBlocks = tscCreateBlockArrayList(); - STableDataBlocks *pTableDataBlock = - tscCreateDataBlockEx(TSDB_PAYLOAD_SIZE, pMeterMeta->rowSize, sizeof(SShellSubmitBlock), pMeterMetaInfo->name); + STableDataBlocks *pTableDataBlock = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, pMeterMeta->rowSize, + sizeof(SShellSubmitBlock), pMeterMetaInfo->name); tscAppendDataBlock(pCmd->pDataBlocks, pTableDataBlock); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 288e564cb0b9e11d571e85160ef74dfbbdea74ed..7c7310a1c7e12fb607920cfeb32f1f6480de8fd8 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -451,15 +451,6 @@ void tscFreeSqlObj(SSqlObj* pSql) { free(pSql); } -STableDataBlocks* tscCreateDataBlock(int32_t size) { - STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks)); - dataBuf->nAllocSize = (uint32_t)size; - dataBuf->pData = calloc(1, dataBuf->nAllocSize); - dataBuf->ordered = true; - dataBuf->prevTS = INT64_MIN; - return dataBuf; -} - void tscDestroyDataBlock(STableDataBlocks* pDataBlock) { if (pDataBlock == NULL) { return; @@ -467,6 +458,9 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock) { tfree(pDataBlock->pData); tfree(pDataBlock->params); + + // free the refcount for metermeta + taosRemoveDataFromCache(tscCacheHandle, (void**) &(pDataBlock->pMeterMeta), false); tfree(pDataBlock); } @@ -513,11 +507,11 @@ SDataBlockList* tscCreateBlockArrayList() { void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks) { if (pList->nSize >= pList->nAlloc) { - pList->nAlloc = pList->nAlloc << 1; - pList->pData = realloc(pList->pData, sizeof(void*) * (size_t)pList->nAlloc); + pList->nAlloc = (pList->nAlloc) << 1U; + pList->pData = realloc(pList->pData, POINTER_BYTES * (size_t)pList->nAlloc); // reset allocated memory - memset(pList->pData + pList->nSize, 0, sizeof(void*) * (pList->nAlloc - pList->nSize)); + memset(pList->pData + pList->nSize, 0, POINTER_BYTES * (pList->nAlloc - pList->nSize)); } pList->pData[pList->nSize++] = pBlocks; @@ -539,29 +533,43 @@ void* tscDestroyBlockArrayList(SDataBlockList* pList) { } int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { - SSqlCmd* pCmd = &pSql->cmd; - + SSqlCmd *pCmd = &pSql->cmd; + assert(pDataBlock->pMeterMeta != NULL); + pCmd->count = pDataBlock->numOfMeters; - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); - strcpy(pMeterMetaInfo->name, pDataBlock->meterId); - + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); + + //set the correct metermeta object, the metermeta has been locked in pDataBlocks, so it must be in the cache + if (pMeterMetaInfo->pMeterMeta != pDataBlock->pMeterMeta) { + strcpy(pMeterMetaInfo->name, pDataBlock->meterId); + taosRemoveDataFromCache(tscCacheHandle, (void**) &(pMeterMetaInfo->pMeterMeta), false); + + pMeterMetaInfo->pMeterMeta = pDataBlock->pMeterMeta; + pDataBlock->pMeterMeta = NULL; // delegate the ownership of metermeta to pMeterMetaInfo + } else { + assert(strncmp(pMeterMetaInfo->name, pDataBlock->meterId, tListLen(pDataBlock->meterId)) == 0); + } + /* * the submit message consists of : [RPC header|message body|digest] * the dataBlock only includes the RPC Header buffer and actual submit messsage body, space for digest needs * additional space. */ int ret = tscAllocPayload(pCmd, pDataBlock->nAllocSize + sizeof(STaosDigest)); - if (TSDB_CODE_SUCCESS != ret) return ret; + if (TSDB_CODE_SUCCESS != ret) { + return ret; + } + memcpy(pCmd->payload, pDataBlock->pData, pDataBlock->nAllocSize); - + /* * the payloadLen should be actual message body size * the old value of payloadLen is the allocated payload size */ pCmd->payloadLen = pDataBlock->nAllocSize - tsRpcHeadSize; - + assert(pCmd->allocSize >= pCmd->payloadLen + tsRpcHeadSize + sizeof(STaosDigest)); - return tscGetMeterMeta(pSql, pMeterMetaInfo->name, 0); + return TSDB_CODE_SUCCESS; } void tscFreeUnusedDataBlocks(SDataBlockList* pList) { @@ -573,19 +581,38 @@ void tscFreeUnusedDataBlocks(SDataBlockList* pList) { } } -STableDataBlocks* tscCreateDataBlockEx(size_t size, int32_t rowSize, int32_t startOffset, char* name) { - STableDataBlocks* dataBuf = tscCreateDataBlock(size); +/** + * create the in-memory buffer for each table to keep the submitted data block + * @param initialSize + * @param rowSize + * @param startOffset + * @param name + * @param pMeterMeta the ownership of pMeterMeta should be transfer to STableDataBlocks + * @return + */ +STableDataBlocks* tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name) { + + STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks)); + dataBuf->nAllocSize = (uint32_t) initialSize; + dataBuf->pData = calloc(1, dataBuf->nAllocSize); + dataBuf->ordered = true; + dataBuf->prevTS = INT64_MIN; dataBuf->rowSize = rowSize; dataBuf->size = startOffset; dataBuf->tsSource = -1; strncpy(dataBuf->meterId, name, TSDB_METER_ID_LEN); + + // sure that the metermeta must be in the local client cache + dataBuf->pMeterMeta = taosGetDataFromCache(tscCacheHandle, dataBuf->meterId); + assert(dataBuf->pMeterMeta != NULL && initialSize > 0); + return dataBuf; } STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size, - int32_t startOffset, int32_t rowSize, char* tableId) { + int32_t startOffset, int32_t rowSize, const char* tableId) { STableDataBlocks* dataBuf = NULL; STableDataBlocks** t1 = (STableDataBlocks**)taosGetIntHashData(pHashList, id); @@ -594,7 +621,7 @@ STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pData } if (dataBuf == NULL) { - dataBuf = tscCreateDataBlockEx((size_t)size, rowSize, startOffset, tableId); + dataBuf = tscCreateDataBlock((size_t)size, rowSize, startOffset, tableId); dataBuf = *(STableDataBlocks**)taosAddIntHash(pHashList, id, (char*)&dataBuf); tscAppendDataBlock(pDataBlockList, dataBuf); }