提交 d2ef1dad 编写于 作者: H hjxilinx

[tbase-1287]

上级 2ab2e283
...@@ -67,7 +67,7 @@ typedef struct SJoinSubquerySupporter { ...@@ -67,7 +67,7 @@ typedef struct SJoinSubquerySupporter {
} SJoinSubquerySupporter; } SJoinSubquerySupporter;
void tscDestroyDataBlock(STableDataBlocks* pDataBlock); void tscDestroyDataBlock(STableDataBlocks* pDataBlock);
STableDataBlocks* tscCreateDataBlock(int32_t size); STableDataBlocks* tscCreateDataBlock(size_t initialBufSize, int32_t rowSize, int32_t startOffset, const char* name);
void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks); void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks);
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes, SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes,
uint32_t offset); uint32_t offset);
...@@ -78,9 +78,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDa ...@@ -78,9 +78,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDa
void tscFreeUnusedDataBlocks(SDataBlockList* pList); void tscFreeUnusedDataBlocks(SDataBlockList* pList);
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pDataList); int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pDataList);
STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size, STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size,
int32_t startOffset, int32_t rowSize, char* tableId); int32_t startOffset, int32_t rowSize, const char* tableId);
STableDataBlocks* tscCreateDataBlockEx(size_t size, int32_t rowSize, int32_t startOffset, char* name);
SVnodeSidList* tscGetVnodeSidList(SMetricMeta* pMetricmeta, int32_t vnodeIdx); SVnodeSidList* tscGetVnodeSidList(SMetricMeta* pMetricmeta, int32_t vnodeIdx);
SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx); SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);
......
...@@ -231,17 +231,22 @@ typedef struct SParamInfo { ...@@ -231,17 +231,22 @@ typedef struct SParamInfo {
typedef struct STableDataBlocks { typedef struct STableDataBlocks {
char meterId[TSDB_METER_ID_LEN]; char meterId[TSDB_METER_ID_LEN];
int8_t tsSource; int8_t tsSource; // where does the UNIX timestamp come from, server or client
bool ordered; bool ordered; // if current rows are ordered or not
int64_t vgid; // virtual group id
int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending
int32_t numOfMeters; // number of tables in current submit block
int64_t vgid; int32_t rowSize; // row size for current table
int64_t prevTS;
int32_t numOfMeters;
int32_t rowSize;
uint32_t nAllocSize; uint32_t nAllocSize;
uint32_t size; uint32_t size;
/*
* the metermeta for current table, the metermeta will be used during submit stage, keep a ref
* to avoid it to be removed from cache
*/
SMeterMeta* pMeterMeta;
union { union {
char *filename; char *filename;
char *pData; char *pData;
...@@ -255,8 +260,8 @@ typedef struct STableDataBlocks { ...@@ -255,8 +260,8 @@ typedef struct STableDataBlocks {
typedef struct SDataBlockList { typedef struct SDataBlockList {
int32_t idx; int32_t idx;
int32_t nSize; uint32_t nSize;
int32_t nAlloc; uint32_t nAlloc;
char * userParam; /* user assigned parameters for async query */ char * userParam; /* user assigned parameters for async query */
void * udfp; /* user defined function pointer, used in async model */ void * udfp; /* user defined function pointer, used in async model */
STableDataBlocks **pData; STableDataBlocks **pData;
......
...@@ -985,7 +985,7 @@ int doParserInsertSql(SSqlObj *pSql, char *str) { ...@@ -985,7 +985,7 @@ int doParserInsertSql(SSqlObj *pSql, char *str) {
strcpy(fname, full_path.we_wordv[0]); strcpy(fname, full_path.we_wordv[0]);
wordfree(&full_path); wordfree(&full_path);
STableDataBlocks *pDataBlock = tscCreateDataBlockEx(PATH_MAX, pMeterMetaInfo->pMeterMeta->rowSize, STableDataBlocks *pDataBlock = tscCreateDataBlock(PATH_MAX, pMeterMetaInfo->pMeterMeta->rowSize,
sizeof(SShellSubmitBlock), pMeterMetaInfo->name); sizeof(SShellSubmitBlock), pMeterMetaInfo->name);
tscAppendDataBlock(pCmd->pDataBlocks, pDataBlock); tscAppendDataBlock(pCmd->pDataBlocks, pDataBlock);
...@@ -1222,8 +1222,8 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) { ...@@ -1222,8 +1222,8 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
int32_t rowSize = pMeterMeta->rowSize; int32_t rowSize = pMeterMeta->rowSize;
pCmd->pDataBlocks = tscCreateBlockArrayList(); pCmd->pDataBlocks = tscCreateBlockArrayList();
STableDataBlocks *pTableDataBlock = STableDataBlocks *pTableDataBlock = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, pMeterMeta->rowSize,
tscCreateDataBlockEx(TSDB_PAYLOAD_SIZE, pMeterMeta->rowSize, sizeof(SShellSubmitBlock), pMeterMetaInfo->name); sizeof(SShellSubmitBlock), pMeterMetaInfo->name);
tscAppendDataBlock(pCmd->pDataBlocks, pTableDataBlock); tscAppendDataBlock(pCmd->pDataBlocks, pTableDataBlock);
......
...@@ -451,15 +451,6 @@ void tscFreeSqlObj(SSqlObj* pSql) { ...@@ -451,15 +451,6 @@ void tscFreeSqlObj(SSqlObj* pSql) {
free(pSql); free(pSql);
} }
STableDataBlocks* tscCreateDataBlock(int32_t size) {
STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks));
dataBuf->nAllocSize = (uint32_t)size;
dataBuf->pData = calloc(1, dataBuf->nAllocSize);
dataBuf->ordered = true;
dataBuf->prevTS = INT64_MIN;
return dataBuf;
}
void tscDestroyDataBlock(STableDataBlocks* pDataBlock) { void tscDestroyDataBlock(STableDataBlocks* pDataBlock) {
if (pDataBlock == NULL) { if (pDataBlock == NULL) {
return; return;
...@@ -467,6 +458,9 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock) { ...@@ -467,6 +458,9 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock) {
tfree(pDataBlock->pData); tfree(pDataBlock->pData);
tfree(pDataBlock->params); tfree(pDataBlock->params);
// free the refcount for metermeta
taosRemoveDataFromCache(tscCacheHandle, (void**) &(pDataBlock->pMeterMeta), false);
tfree(pDataBlock); tfree(pDataBlock);
} }
...@@ -513,11 +507,11 @@ SDataBlockList* tscCreateBlockArrayList() { ...@@ -513,11 +507,11 @@ SDataBlockList* tscCreateBlockArrayList() {
void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks) { void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks) {
if (pList->nSize >= pList->nAlloc) { if (pList->nSize >= pList->nAlloc) {
pList->nAlloc = pList->nAlloc << 1; pList->nAlloc = (pList->nAlloc) << 1U;
pList->pData = realloc(pList->pData, sizeof(void*) * (size_t)pList->nAlloc); pList->pData = realloc(pList->pData, POINTER_BYTES * (size_t)pList->nAlloc);
// reset allocated memory // reset allocated memory
memset(pList->pData + pList->nSize, 0, sizeof(void*) * (pList->nAlloc - pList->nSize)); memset(pList->pData + pList->nSize, 0, POINTER_BYTES * (pList->nAlloc - pList->nSize));
} }
pList->pData[pList->nSize++] = pBlocks; pList->pData[pList->nSize++] = pBlocks;
...@@ -539,11 +533,22 @@ void* tscDestroyBlockArrayList(SDataBlockList* pList) { ...@@ -539,11 +533,22 @@ void* tscDestroyBlockArrayList(SDataBlockList* pList) {
} }
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
assert(pDataBlock->pMeterMeta != NULL);
pCmd->count = pDataBlock->numOfMeters; pCmd->count = pDataBlock->numOfMeters;
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
//set the correct metermeta object, the metermeta has been locked in pDataBlocks, so it must be in the cache
if (pMeterMetaInfo->pMeterMeta != pDataBlock->pMeterMeta) {
strcpy(pMeterMetaInfo->name, pDataBlock->meterId); strcpy(pMeterMetaInfo->name, pDataBlock->meterId);
taosRemoveDataFromCache(tscCacheHandle, (void**) &(pMeterMetaInfo->pMeterMeta), false);
pMeterMetaInfo->pMeterMeta = pDataBlock->pMeterMeta;
pDataBlock->pMeterMeta = NULL; // delegate the ownership of metermeta to pMeterMetaInfo
} else {
assert(strncmp(pMeterMetaInfo->name, pDataBlock->meterId, tListLen(pDataBlock->meterId)) == 0);
}
/* /*
* the submit message consists of : [RPC header|message body|digest] * the submit message consists of : [RPC header|message body|digest]
...@@ -551,7 +556,10 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { ...@@ -551,7 +556,10 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
* additional space. * additional space.
*/ */
int ret = tscAllocPayload(pCmd, pDataBlock->nAllocSize + sizeof(STaosDigest)); int ret = tscAllocPayload(pCmd, pDataBlock->nAllocSize + sizeof(STaosDigest));
if (TSDB_CODE_SUCCESS != ret) return ret; if (TSDB_CODE_SUCCESS != ret) {
return ret;
}
memcpy(pCmd->payload, pDataBlock->pData, pDataBlock->nAllocSize); memcpy(pCmd->payload, pDataBlock->pData, pDataBlock->nAllocSize);
/* /*
...@@ -561,7 +569,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { ...@@ -561,7 +569,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
pCmd->payloadLen = pDataBlock->nAllocSize - tsRpcHeadSize; pCmd->payloadLen = pDataBlock->nAllocSize - tsRpcHeadSize;
assert(pCmd->allocSize >= pCmd->payloadLen + tsRpcHeadSize + sizeof(STaosDigest)); assert(pCmd->allocSize >= pCmd->payloadLen + tsRpcHeadSize + sizeof(STaosDigest));
return tscGetMeterMeta(pSql, pMeterMetaInfo->name, 0); return TSDB_CODE_SUCCESS;
} }
void tscFreeUnusedDataBlocks(SDataBlockList* pList) { void tscFreeUnusedDataBlocks(SDataBlockList* pList) {
...@@ -573,19 +581,38 @@ void tscFreeUnusedDataBlocks(SDataBlockList* pList) { ...@@ -573,19 +581,38 @@ void tscFreeUnusedDataBlocks(SDataBlockList* pList) {
} }
} }
STableDataBlocks* tscCreateDataBlockEx(size_t size, int32_t rowSize, int32_t startOffset, char* name) { /**
STableDataBlocks* dataBuf = tscCreateDataBlock(size); * create the in-memory buffer for each table to keep the submitted data block
* @param initialSize
* @param rowSize
* @param startOffset
* @param name
* @param pMeterMeta the ownership of pMeterMeta should be transfer to STableDataBlocks
* @return
*/
STableDataBlocks* tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name) {
STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks));
dataBuf->nAllocSize = (uint32_t) initialSize;
dataBuf->pData = calloc(1, dataBuf->nAllocSize);
dataBuf->ordered = true;
dataBuf->prevTS = INT64_MIN;
dataBuf->rowSize = rowSize; dataBuf->rowSize = rowSize;
dataBuf->size = startOffset; dataBuf->size = startOffset;
dataBuf->tsSource = -1; dataBuf->tsSource = -1;
strncpy(dataBuf->meterId, name, TSDB_METER_ID_LEN); strncpy(dataBuf->meterId, name, TSDB_METER_ID_LEN);
// sure that the metermeta must be in the local client cache
dataBuf->pMeterMeta = taosGetDataFromCache(tscCacheHandle, dataBuf->meterId);
assert(dataBuf->pMeterMeta != NULL && initialSize > 0);
return dataBuf; return dataBuf;
} }
STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size, STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size,
int32_t startOffset, int32_t rowSize, char* tableId) { int32_t startOffset, int32_t rowSize, const char* tableId) {
STableDataBlocks* dataBuf = NULL; STableDataBlocks* dataBuf = NULL;
STableDataBlocks** t1 = (STableDataBlocks**)taosGetIntHashData(pHashList, id); STableDataBlocks** t1 = (STableDataBlocks**)taosGetIntHashData(pHashList, id);
...@@ -594,7 +621,7 @@ STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pData ...@@ -594,7 +621,7 @@ STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pData
} }
if (dataBuf == NULL) { if (dataBuf == NULL) {
dataBuf = tscCreateDataBlockEx((size_t)size, rowSize, startOffset, tableId); dataBuf = tscCreateDataBlock((size_t)size, rowSize, startOffset, tableId);
dataBuf = *(STableDataBlocks**)taosAddIntHash(pHashList, id, (char*)&dataBuf); dataBuf = *(STableDataBlocks**)taosAddIntHash(pHashList, id, (char*)&dataBuf);
tscAppendDataBlock(pDataBlockList, dataBuf); tscAppendDataBlock(pDataBlockList, dataBuf);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册