From 169338eaaf2e97196b8aacef98864f631a37aec2 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 6 Aug 2019 11:31:26 +0800 Subject: [PATCH] fix issue #280 --- src/system/src/vnodeImport.c | 52 +++++++++++++++++++----------------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/src/system/src/vnodeImport.c b/src/system/src/vnodeImport.c index a49c9a5619..d2493a5830 100644 --- a/src/system/src/vnodeImport.c +++ b/src/system/src/vnodeImport.c @@ -41,7 +41,7 @@ typedef struct { } SHeadInfo; typedef struct { - void * signature; + void *signature; SShellObj *pShell; SMeterObj *pObj; int retry; @@ -58,9 +58,10 @@ typedef struct { int numOfPoints; int fileId; int64_t offset; // offset in data file - SData * sdata[TSDB_MAX_COLUMNS]; - char * buffer; - char * payload; + SData *sdata[TSDB_MAX_COLUMNS]; + char *buffer; + char *payload; + char *opayload; // allocated space for payload from client int rows; } SImportInfo; @@ -176,9 +177,9 @@ int vnodeProcessLastBlock(SImportInfo *pImport, SHeadInfo *pHinfo, SData *data[] } int vnodeOpenFileForImport(SImportInfo *pImport, char *payload, SHeadInfo *pHinfo, SData *data[]) { - SMeterObj * pObj = pImport->pObj; - SVnodeObj * pVnode = &vnodeList[pObj->vnode]; - SVnodeCfg * pCfg = &pVnode->cfg; + SMeterObj *pObj = pImport->pObj; + SVnodeObj *pVnode = &vnodeList[pObj->vnode]; + SVnodeCfg *pCfg = &pVnode->cfg; TSKEY firstKey = *((TSKEY *)payload); struct stat filestat; int sid, rowsBefore = 0; @@ -276,10 +277,10 @@ void vnodeProcessImportTimer(void *param, void *tmrId) { return; } - SMeterObj * pObj = pImport->pObj; - SVnodeObj * pVnode = &vnodeList[pObj->vnode]; + SMeterObj *pObj = pImport->pObj; + SVnodeObj *pVnode = &vnodeList[pObj->vnode]; SCachePool *pPool = (SCachePool *)pVnode->pCachePool; - SShellObj * pShell = pImport->pShell; + SShellObj *pShell = pImport->pShell; pImport->retry++; @@ -334,18 +335,18 @@ void vnodeProcessImportTimer(void *param, void *tmrId) { } pImport->signature = NULL; - free(pImport->payload); + free(pImport->opayload); free(pImport); } int vnodeImportToFile(SImportInfo *pImport) { - SMeterObj * pObj = pImport->pObj; - SVnodeObj * pVnode = &vnodeList[pObj->vnode]; - SVnodeCfg * pCfg = &pVnode->cfg; + SMeterObj *pObj = pImport->pObj; + SVnodeObj *pVnode = &vnodeList[pObj->vnode]; + SVnodeCfg *pCfg = &pVnode->cfg; SHeadInfo headInfo; int code = 0, col; SCompBlock compBlock; - char * payload = pImport->payload; + char *payload = pImport->payload; int rows = pImport->rows; SCachePool *pPool = (SCachePool *)pVnode->pCachePool; @@ -516,9 +517,9 @@ _exit: } int vnodeImportToCache(SImportInfo *pImport, char *payload, int rows) { - SMeterObj * pObj = pImport->pObj; - SVnodeObj * pVnode = &vnodeList[pObj->vnode]; - SVnodeCfg * pCfg = &pVnode->cfg; + SMeterObj *pObj = pImport->pObj; + SVnodeObj *pVnode = &vnodeList[pObj->vnode]; + SVnodeCfg *pCfg = &pVnode->cfg; int code = -1; SCacheInfo *pInfo = (SCacheInfo *)pObj->pCache; int slot, pos, row, col, points, tpoints; @@ -627,8 +628,8 @@ _exit: } int vnodeFindKeyInFile(SImportInfo *pImport, int order) { - SMeterObj * pObj = pImport->pObj; - SVnodeObj * pVnode = &vnodeList[pObj->vnode]; + SMeterObj *pObj = pImport->pObj; + SVnodeObj *pVnode = &vnodeList[pObj->vnode]; int code = -1; SQuery query; SColumnFilter colList[TSDB_MAX_COLUMNS] = {0}; @@ -720,7 +721,7 @@ int vnodeFindKeyInFile(SImportInfo *pImport, int order) { } int vnodeFindKeyInCache(SImportInfo *pImport, int order) { - SMeterObj * pObj = pImport->pObj; + SMeterObj *pObj = pImport->pObj; int code = 0; SQuery query; SCacheInfo *pInfo = (SCacheInfo *)pObj->pCache; @@ -847,12 +848,12 @@ int vnodeImportWholeToCache(SImportInfo *pImport, char *payload, int rows) { int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *param, int sversion, int *pNumOfPoints) { SSubmitMsg *pSubmit = (SSubmitMsg *)cont; - SVnodeObj * pVnode = &vnodeList[pObj->vnode]; + SVnodeObj *pVnode = &vnodeList[pObj->vnode]; int rows; - char * payload; + char *payload; int code = TSDB_CODE_ACTION_IN_PROGRESS; SCachePool *pPool = (SCachePool *)pVnode->pCachePool; - SShellObj * pShell = (SShellObj *)param; + SShellObj *pShell = (SShellObj *)param; int pointsImported = 0; rows = htons(pSubmit->numOfRows); @@ -923,6 +924,7 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi pNew->signature = pNew; int payloadLen = contLen - sizeof(SSubmitMsg); pNew->payload = malloc(payloadLen); + pNew->opayload = pNew->payload; memcpy(pNew->payload, payload, payloadLen); dTrace("vid:%d sid:%d id:%s, import later, commit in process:%d, numOfQueries:%d", pObj->vnode, pObj->sid, @@ -963,7 +965,7 @@ int vnodeImportData(SMeterObj *pObj, SImportInfo *pImport) { code = vnodeImportStartToFile(pImport, pImport->payload, pImport->rows); } - SVnodeObj * pVnode = &vnodeList[pObj->vnode]; + SVnodeObj *pVnode = &vnodeList[pObj->vnode]; SCachePool *pPool = (SCachePool *)pVnode->pCachePool; pPool->commitInProcess = 0; -- GitLab