OffsetManager.java 5.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
/*
 * Copyright 2017, OpenSkywalking Organization All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * Project repository: https://github.com/OpenSkywalking/skywalking
 */

package org.skywalking.apm.collector.agentstream.worker.segment.buffer;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.skywalking.apm.collector.agentstream.config.BufferFileConfig;
import org.skywalking.apm.collector.agentstream.worker.util.FileUtils;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author pengys5
 */
public enum OffsetManager {
    INSTANCE;

    private final Logger logger = LoggerFactory.getLogger(OffsetManager.class);

    private static final String OFFSET_FILE_PREFIX = "offset";
    private File offsetFile;
    private Offset offset;
    private boolean initialized = false;
    private RandomAccessFile randomAccessFile = null;
    private String lastOffsetRecord = Const.EMPTY_STRING;

    public synchronized void initialize() throws IOException {
        if (!initialized) {
            this.offset = new Offset();
            File dataPath = new File(SegmentBufferConfig.BUFFER_PATH);
            if (dataPath.mkdirs()) {
                createOffsetFile();
            } else {
                File[] offsetFiles = dataPath.listFiles(new PrefixFileNameFilter());
                if (CollectionUtils.isNotEmpty(offsetFiles) && offsetFiles.length > 0) {
                    for (int i = 0; i < offsetFiles.length; i++) {
                        if (i != offsetFiles.length - 1) {
                            offsetFiles[i].delete();
                        } else {
                            offsetFile = offsetFiles[i];
                        }
                    }
                } else {
                    createOffsetFile();
                }
            }
            String offsetRecord = FileUtils.INSTANCE.readLastLine(offsetFile);
            offset.deserialize(offsetRecord);
            initialized = true;

            Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> flush(), 10, 3, TimeUnit.SECONDS);
        }
    }

    private void createOffsetFile() throws IOException {
        String timeBucket = String.valueOf(TimeBucketUtils.INSTANCE.getSecondTimeBucket(System.currentTimeMillis()));
        String offsetFileName = OFFSET_FILE_PREFIX + "_" + timeBucket + "." + Const.FILE_SUFFIX;
        offsetFile = new File(SegmentBufferConfig.BUFFER_PATH + offsetFileName);
        this.offset.getWriteOffset().setWriteFileName(Const.EMPTY_STRING);
        this.offset.getWriteOffset().setWriteFileOffset(0);
        this.offset.getReadOffset().setReadFileName(Const.EMPTY_STRING);
        this.offset.getReadOffset().setReadFileOffset(0);
        this.flush();
    }

    public void flush() {
        String offsetRecord = offset.serialize();
        if (!lastOffsetRecord.equals(offsetRecord)) {
            if (offsetFile.length() >= BufferFileConfig.BUFFER_OFFSET_MAX_FILE_SIZE) {
P
peng-yongsheng 已提交
93
                nextFile();
94 95 96 97 98 99
            }
            FileUtils.INSTANCE.writeAppendToLast(offsetFile, randomAccessFile, offsetRecord);
            lastOffsetRecord = offsetRecord;
        }
    }

P
peng-yongsheng 已提交
100
    private void nextFile() {
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
        String timeBucket = String.valueOf(TimeBucketUtils.INSTANCE.getSecondTimeBucket(System.currentTimeMillis()));
        String offsetFileName = OFFSET_FILE_PREFIX + "_" + timeBucket + "." + Const.FILE_SUFFIX;
        File newOffsetFile = new File(SegmentBufferConfig.BUFFER_PATH + offsetFileName);
        offsetFile.delete();
        offsetFile = newOffsetFile;
        this.flush();
    }

    public String getReadFileName() {
        return offset.getReadOffset().getReadFileName();
    }

    public long getReadFileOffset() {
        return offset.getReadOffset().getReadFileOffset();
    }

    public void setReadOffset(long readFileOffset) {
        offset.getReadOffset().setReadFileOffset(readFileOffset);
    }

    public void setReadOffset(String readFileName, long readFileOffset) {
        offset.getReadOffset().setReadFileName(readFileName);
        offset.getReadOffset().setReadFileOffset(readFileOffset);
    }

    public String getWriteFileName() {
        return offset.getWriteOffset().getWriteFileName();
    }

    public long getWriteFileOffset() {
        return offset.getWriteOffset().getWriteFileOffset();
    }

    public void setWriteOffset(String writeFileName, long writeFileOffset) {
        offset.getWriteOffset().setWriteFileName(writeFileName);
        offset.getWriteOffset().setWriteFileOffset(writeFileOffset);
    }

    public void setWriteOffset(long writeFileOffset) {
        offset.getWriteOffset().setWriteFileOffset(writeFileOffset);
    }

    class PrefixFileNameFilter implements FilenameFilter {
        @Override public boolean accept(File dir, String name) {
P
peng-yongsheng 已提交
145
            return name.startsWith(OFFSET_FILE_PREFIX);
146 147 148
        }
    }
}