提交 169338ea 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

fix issue #280

上级 2470e3f5
...@@ -41,7 +41,7 @@ typedef struct { ...@@ -41,7 +41,7 @@ typedef struct {
} SHeadInfo; } SHeadInfo;
typedef struct { typedef struct {
void * signature; void *signature;
SShellObj *pShell; SShellObj *pShell;
SMeterObj *pObj; SMeterObj *pObj;
int retry; int retry;
...@@ -58,9 +58,10 @@ typedef struct { ...@@ -58,9 +58,10 @@ typedef struct {
int numOfPoints; int numOfPoints;
int fileId; int fileId;
int64_t offset; // offset in data file int64_t offset; // offset in data file
SData * sdata[TSDB_MAX_COLUMNS]; SData *sdata[TSDB_MAX_COLUMNS];
char * buffer; char *buffer;
char * payload; char *payload;
char *opayload; // allocated space for payload from client
int rows; int rows;
} SImportInfo; } SImportInfo;
...@@ -176,9 +177,9 @@ int vnodeProcessLastBlock(SImportInfo *pImport, SHeadInfo *pHinfo, SData *data[] ...@@ -176,9 +177,9 @@ int vnodeProcessLastBlock(SImportInfo *pImport, SHeadInfo *pHinfo, SData *data[]
} }
int vnodeOpenFileForImport(SImportInfo *pImport, char *payload, SHeadInfo *pHinfo, SData *data[]) { int vnodeOpenFileForImport(SImportInfo *pImport, char *payload, SHeadInfo *pHinfo, SData *data[]) {
SMeterObj * pObj = pImport->pObj; SMeterObj *pObj = pImport->pObj;
SVnodeObj * pVnode = &vnodeList[pObj->vnode]; SVnodeObj *pVnode = &vnodeList[pObj->vnode];
SVnodeCfg * pCfg = &pVnode->cfg; SVnodeCfg *pCfg = &pVnode->cfg;
TSKEY firstKey = *((TSKEY *)payload); TSKEY firstKey = *((TSKEY *)payload);
struct stat filestat; struct stat filestat;
int sid, rowsBefore = 0; int sid, rowsBefore = 0;
...@@ -276,10 +277,10 @@ void vnodeProcessImportTimer(void *param, void *tmrId) { ...@@ -276,10 +277,10 @@ void vnodeProcessImportTimer(void *param, void *tmrId) {
return; return;
} }
SMeterObj * pObj = pImport->pObj; SMeterObj *pObj = pImport->pObj;
SVnodeObj * pVnode = &vnodeList[pObj->vnode]; SVnodeObj *pVnode = &vnodeList[pObj->vnode];
SCachePool *pPool = (SCachePool *)pVnode->pCachePool; SCachePool *pPool = (SCachePool *)pVnode->pCachePool;
SShellObj * pShell = pImport->pShell; SShellObj *pShell = pImport->pShell;
pImport->retry++; pImport->retry++;
...@@ -334,18 +335,18 @@ void vnodeProcessImportTimer(void *param, void *tmrId) { ...@@ -334,18 +335,18 @@ void vnodeProcessImportTimer(void *param, void *tmrId) {
} }
pImport->signature = NULL; pImport->signature = NULL;
free(pImport->payload); free(pImport->opayload);
free(pImport); free(pImport);
} }
int vnodeImportToFile(SImportInfo *pImport) { int vnodeImportToFile(SImportInfo *pImport) {
SMeterObj * pObj = pImport->pObj; SMeterObj *pObj = pImport->pObj;
SVnodeObj * pVnode = &vnodeList[pObj->vnode]; SVnodeObj *pVnode = &vnodeList[pObj->vnode];
SVnodeCfg * pCfg = &pVnode->cfg; SVnodeCfg *pCfg = &pVnode->cfg;
SHeadInfo headInfo; SHeadInfo headInfo;
int code = 0, col; int code = 0, col;
SCompBlock compBlock; SCompBlock compBlock;
char * payload = pImport->payload; char *payload = pImport->payload;
int rows = pImport->rows; int rows = pImport->rows;
SCachePool *pPool = (SCachePool *)pVnode->pCachePool; SCachePool *pPool = (SCachePool *)pVnode->pCachePool;
...@@ -516,9 +517,9 @@ _exit: ...@@ -516,9 +517,9 @@ _exit:
} }
int vnodeImportToCache(SImportInfo *pImport, char *payload, int rows) { int vnodeImportToCache(SImportInfo *pImport, char *payload, int rows) {
SMeterObj * pObj = pImport->pObj; SMeterObj *pObj = pImport->pObj;
SVnodeObj * pVnode = &vnodeList[pObj->vnode]; SVnodeObj *pVnode = &vnodeList[pObj->vnode];
SVnodeCfg * pCfg = &pVnode->cfg; SVnodeCfg *pCfg = &pVnode->cfg;
int code = -1; int code = -1;
SCacheInfo *pInfo = (SCacheInfo *)pObj->pCache; SCacheInfo *pInfo = (SCacheInfo *)pObj->pCache;
int slot, pos, row, col, points, tpoints; int slot, pos, row, col, points, tpoints;
...@@ -627,8 +628,8 @@ _exit: ...@@ -627,8 +628,8 @@ _exit:
} }
int vnodeFindKeyInFile(SImportInfo *pImport, int order) { int vnodeFindKeyInFile(SImportInfo *pImport, int order) {
SMeterObj * pObj = pImport->pObj; SMeterObj *pObj = pImport->pObj;
SVnodeObj * pVnode = &vnodeList[pObj->vnode]; SVnodeObj *pVnode = &vnodeList[pObj->vnode];
int code = -1; int code = -1;
SQuery query; SQuery query;
SColumnFilter colList[TSDB_MAX_COLUMNS] = {0}; SColumnFilter colList[TSDB_MAX_COLUMNS] = {0};
...@@ -720,7 +721,7 @@ int vnodeFindKeyInFile(SImportInfo *pImport, int order) { ...@@ -720,7 +721,7 @@ int vnodeFindKeyInFile(SImportInfo *pImport, int order) {
} }
int vnodeFindKeyInCache(SImportInfo *pImport, int order) { int vnodeFindKeyInCache(SImportInfo *pImport, int order) {
SMeterObj * pObj = pImport->pObj; SMeterObj *pObj = pImport->pObj;
int code = 0; int code = 0;
SQuery query; SQuery query;
SCacheInfo *pInfo = (SCacheInfo *)pObj->pCache; SCacheInfo *pInfo = (SCacheInfo *)pObj->pCache;
...@@ -847,12 +848,12 @@ int vnodeImportWholeToCache(SImportInfo *pImport, char *payload, int rows) { ...@@ -847,12 +848,12 @@ int vnodeImportWholeToCache(SImportInfo *pImport, char *payload, int rows) {
int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *param, int sversion, int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *param, int sversion,
int *pNumOfPoints) { int *pNumOfPoints) {
SSubmitMsg *pSubmit = (SSubmitMsg *)cont; SSubmitMsg *pSubmit = (SSubmitMsg *)cont;
SVnodeObj * pVnode = &vnodeList[pObj->vnode]; SVnodeObj *pVnode = &vnodeList[pObj->vnode];
int rows; int rows;
char * payload; char *payload;
int code = TSDB_CODE_ACTION_IN_PROGRESS; int code = TSDB_CODE_ACTION_IN_PROGRESS;
SCachePool *pPool = (SCachePool *)pVnode->pCachePool; SCachePool *pPool = (SCachePool *)pVnode->pCachePool;
SShellObj * pShell = (SShellObj *)param; SShellObj *pShell = (SShellObj *)param;
int pointsImported = 0; int pointsImported = 0;
rows = htons(pSubmit->numOfRows); rows = htons(pSubmit->numOfRows);
...@@ -923,6 +924,7 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -923,6 +924,7 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
pNew->signature = pNew; pNew->signature = pNew;
int payloadLen = contLen - sizeof(SSubmitMsg); int payloadLen = contLen - sizeof(SSubmitMsg);
pNew->payload = malloc(payloadLen); pNew->payload = malloc(payloadLen);
pNew->opayload = pNew->payload;
memcpy(pNew->payload, payload, payloadLen); memcpy(pNew->payload, payload, payloadLen);
dTrace("vid:%d sid:%d id:%s, import later, commit in process:%d, numOfQueries:%d", pObj->vnode, pObj->sid, dTrace("vid:%d sid:%d id:%s, import later, commit in process:%d, numOfQueries:%d", pObj->vnode, pObj->sid,
...@@ -963,7 +965,7 @@ int vnodeImportData(SMeterObj *pObj, SImportInfo *pImport) { ...@@ -963,7 +965,7 @@ int vnodeImportData(SMeterObj *pObj, SImportInfo *pImport) {
code = vnodeImportStartToFile(pImport, pImport->payload, pImport->rows); code = vnodeImportStartToFile(pImport, pImport->payload, pImport->rows);
} }
SVnodeObj * pVnode = &vnodeList[pObj->vnode]; SVnodeObj *pVnode = &vnodeList[pObj->vnode];
SCachePool *pPool = (SCachePool *)pVnode->pCachePool; SCachePool *pPool = (SCachePool *)pVnode->pCachePool;
pPool->commitInProcess = 0; pPool->commitInProcess = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册