taosmigrateMnodeWal.c 3.8 KB
Newer Older
H
Hui Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "taosmigrate.h"

static void recordWrite(int fd, SWalHead *pHead) {
  
  taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead));
  
  int contLen = pHead->len + sizeof(SWalHead);

  if(write(fd, pHead, contLen) != contLen) {
    printf("failed to write(%s)", strerror(errno));
    exit(-1);
  }
}

static void recordMod(SWalHead* pHead) 
{
  SDnodeObj *pDnode;
  
  ESdbTable tableId = (ESdbTable)(pHead->msgType / 10); 

  switch (tableId) {
    case SDB_TABLE_DNODE:
    case SDB_TABLE_MNODE:
      pDnode = (SDnodeObj *)pHead->cont;
    
      printf("dnodeId:%d  port:%d  fqdn:%s  ep:%s\n", pDnode->dnodeId, pDnode->dnodePort, pDnode->dnodeFqdn, pDnode->dnodeEp);

      SdnodeIfo* pDnodeInfo = getDnodeInfo(pDnode->dnodeId);
      if (NULL == pDnodeInfo) {
        break;
      }
      
      pDnode->dnodePort = pDnodeInfo->port;
      tstrncpy(pDnode->dnodeFqdn, pDnodeInfo->fqdn, sizeof(pDnode->dnodeFqdn));
      tstrncpy(pDnode->dnodeEp, pDnodeInfo->ep, sizeof(pDnode->dnodeEp));
      break;
    #if 0
    case SDB_TABLE_ACCOUNT:
      SAcctObj *pAcct = (SDnodeObj *)pHead->cont;
      break;
    case SDB_TABLE_USER:
      SUserObj *pUser = (SDnodeObj *)pHead->cont;
      break;
    case SDB_TABLE_DB:
      SDbObj *pDb = (SDnodeObj *)pHead->cont;
      break;
    case SDB_TABLE_VGROUP:
      SVgObj *pVgroup = (SDnodeObj *)pHead->cont;
      break;
    case SDB_TABLE_STABLE:
      SSuperTableObj *pStable = (SDnodeObj *)pHead->cont;
      break;
    case SDB_TABLE_CTABLE:
      SChildTableObj *pCTable = (SDnodeObj *)pHead->cont;
      break;
    #endif
    default:
      break;
  }  
}

void walModWalFile(char* walfile) {
  char *buffer = malloc(1024000);  // size for one record
  if (buffer == NULL) {
    printf("failed to malloc:%s\n", strerror(errno));
    return ;
  }

  SWalHead *pHead = (SWalHead *)buffer;

  int rfd = open(walfile, O_RDONLY);
  if (rfd < 0) {
    printf("failed to open %s failed:%s\n", walfile, strerror(errno));
    free(buffer);
    return ;
  }

  char newWalFile[32] = "wal0";
  int wfd = open(newWalFile, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);

  if (wfd < 0) {
    printf("wal:%s, failed to open(%s)\n", newWalFile, strerror(errno));
    free(buffer);
H
Hui Li 已提交
99
    close(rfd);
H
Hui Li 已提交
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
    return ;
  }

  printf("start to mod %s into %s\n", walfile, newWalFile);

  while (1) {
    memset(buffer, 0, 1024000);
    int ret = read(rfd, pHead, sizeof(SWalHead));
    if ( ret == 0)  break;  

    if (ret != sizeof(SWalHead)) {
      printf("wal:%s, failed to read head, skip, ret:%d(%s)\n", walfile, ret, strerror(errno));
      break;
    }

    if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
      printf("wal:%s, cksum is messed up, skip the rest of file\n", walfile);
      break;
    } 

H
Hui Li 已提交
120 121 122 123 124
    if (pHead->len >= 1024000 - sizeof(SWalHead)) {
      printf("wal:%s, SWalHead.len(%d) overflow, skip the rest of file\n", walfile, pHead->len);
      break;
    } 

H
Hui Li 已提交
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
    ret = read(rfd, pHead->cont, pHead->len);
    if ( ret != pHead->len) {
      printf("wal:%s, failed to read body, skip, len:%d ret:%d\n", walfile, pHead->len, ret);
      break;
    }

    recordMod(pHead);
    recordWrite(wfd, pHead);
  }

  close(rfd);
  close(wfd);
  free(buffer);

  taosMvFile(walfile, newWalFile);

  return ;
}