From 50b67b19cc0e7bc9605e99249c630457d9ddc9a1 Mon Sep 17 00:00:00 2001 From: dangyifei Date: Wed, 7 Aug 2019 18:00:23 +0800 Subject: [PATCH] add kv to seqfile and read seqfile tool --- cube/cube-builder/CMakeLists.txt | 2 + cube/cube-builder/tool/kv_to_seqfile.py | 73 +++++++ cube/cube-builder/tool/kvtool.py | 274 ++++++++++++++++++++++++ cube/cube-builder/tool/source/file.txt | 18 ++ 4 files changed, 367 insertions(+) create mode 100644 cube/cube-builder/tool/kv_to_seqfile.py create mode 100644 cube/cube-builder/tool/kvtool.py create mode 100644 cube/cube-builder/tool/source/file.txt diff --git a/cube/cube-builder/CMakeLists.txt b/cube/cube-builder/CMakeLists.txt index 84fe7674..0ad85ddc 100755 --- a/cube/cube-builder/CMakeLists.txt +++ b/cube/cube-builder/CMakeLists.txt @@ -47,3 +47,5 @@ target_link_libraries(cube-builder ${DYNAMIC_LIB}) # install install(TARGETS cube-builder RUNTIME DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/bin) + +install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/tool DESTINATION ${PADDLE_SERVING_INSTALL_DIR}) diff --git a/cube/cube-builder/tool/kv_to_seqfile.py b/cube/cube-builder/tool/kv_to_seqfile.py new file mode 100644 index 00000000..78295df8 --- /dev/null +++ b/cube/cube-builder/tool/kv_to_seqfile.py @@ -0,0 +1,73 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +import struct +import time +import datetime +import json +from collections import OrderedDict +import os +from kvtool import SequenceFileWriter + +NOW_TIMESTAMP = time.time() +NOW_DATETIME = datetime.datetime.now().strftime("%Y%m%d%H%M%S") +DATA_PATH = "./data/" +BASE_DATAFILE = DATA_PATH + NOW_DATETIME + "/base/feature" +BASE_DONEFILE = DATA_PATH + "donefile/base.txt" +PATCH_DATAFILE = "./donefile/" + NOW_DATETIME + "/patch/feature" +PATCH_DONEFILE = DATA_PATH + "donefile/patch.txt" +SOURCE_FILE = './source/file.txt' + + +def write_donefile(): + dict = OrderedDict() + dict["id"] = str(int(NOW_TIMESTAMP)) + dict["key"] = dict["id"] + dict["input"] = os.path.abspath(os.path.dirname(BASE_DATAFILE)) + if not os.access(os.path.dirname(BASE_DONEFILE), os.F_OK): + os.makedirs(os.path.dirname(BASE_DONEFILE)) + with open(BASE_DONEFILE, "a+") as f: + f.write(json.dumps(dict) + '\n') + f.close() + + +def kv_to_seqfile(): + if not os.access(os.path.dirname(BASE_DATAFILE), os.F_OK): + os.makedirs(os.path.dirname(BASE_DATAFILE)) + with open(BASE_DATAFILE, "wb") as f: + writer = SequenceFileWriter(f) + res = [] + fp = open(SOURCE_FILE, 'r') + try: + lines = fp.readlines() + finally: + fp.close() + for line in lines: + line_list = line.split(':') + key = int(line_list[0]) + value = str(line_list[1]).replace('\n', '') + res.append(dict) + key_bytes = struct.pack('Q', key) + row_bytes = struct.pack('%ss' % len(value), value) + print key, ':', value, '->', key_bytes, ':', row_bytes + writer.write(key_bytes, row_bytes) + f.close() + write_donefile() + + +if __name__ == '__main__': + """ + tran kv to sequence file and auto create donefile file, you can modify source file addr,output addr in line 25-32 + """ + kv_to_seqfile() diff --git a/cube/cube-builder/tool/kvtool.py b/cube/cube-builder/tool/kvtool.py new file mode 100644 index 00000000..39a9db63 --- /dev/null +++ b/cube/cube-builder/tool/kvtool.py @@ -0,0 +1,274 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +import sys +import struct +import random + + +class Stream(object): + """ bytes stream like sys.stdin + """ + + def __init__(self, source=None, cache=None): + """ init + """ + self._src = source + self._cache_to = cache + self._cache_fd = None + + def read_bytes(self, num): + """read bytes""" + data = self._src.read(num) + if len(data) < num: + if self._cache_fd is not None: + if len(data) > 0: + self._cache_fd.write(data) + self._cache_fd.close() + print >> sys.stderr, 'succeed to cache file[%s]' % ( + self._cache_to) + self._cache_fd = None + raise EOFError + else: + if self._cache_to is not None: + if self._cache_fd is None: + self._cache_fd = open(self._cache_to, 'wb') + self._cache_fd.write(data) + return data + + def read_int(self): + """read int""" + data = self.read_bytes(4) + return struct.unpack('!i', data)[0] + + def read_byte(self): + """read byte""" + byte = self.read_bytes(1) + return struct.unpack('!b', byte)[0] + + def read_string(self): + """read string""" + str_len = self.read_vint() + return unicode(self.read_bytes(str_len), 'utf-8') + + def read_bool(self): + """read bool""" + return bool(self.read_byte()) + + def read_vint(self): + """read vint""" + first = self.read_byte() + l = self._decode_vint_size(first) + if l == 1: + return first + x = 0 + for i in range(l - 1): + b = self.read_byte() + x = x << 8 + x = x | (b & 0xFF) + if self._is_negative_vint(first): + return x ^ -1 + return x + + def _is_negative_vint(self, val): + """check is negative vint""" + return val < -120 or (val >= -122 and val < 0) + + def _decode_vint_size(self, val): + """decode vint size""" + if val >= -122: + return 1 + elif val < -120: + return -119 - val + return -111 - val + + def tell(self): + """ tell """ + return self._src.tell() + + def seek(self, pos): + """ seek """ + self._src.seek(pos) + + +class SequenceFileReader(): + """ a reader for sequencefile + """ + + def __init__(self, seqfile=None, cache=None): + """ init + """ + self.type = 'seqfile' + if seqfile is None: + seqfile = sys.stdin + self.stream = Stream(seqfile, cache=cache) + self.version = None + # self.key_class = None + self.compression_class = None + # self.value_class = None + self.compression = False + self.block_compression = False + self.meta = {} + self.sync = None + self._read_header() + if self.compression or self.block_compression: + raise NotImplementedError( + "reading of seqfiles with compression is not implemented.") + + def _read_header(self): + """ read sequencefile header + """ + stream = self.stream + seq = stream.read_bytes(3) + if seq != "SEQ": + raise ValueError("given file is not a sequence-file") + self.version = stream.read_byte() + self.key_class = stream.read_string() + self.value_class = stream.read_string() + self.compression = stream.read_bool() + self.block_compression = stream.read_bool() + if self.compression: + self.compression_class = stream.read_string() + meta_len = stream.read_int() + for i in range(meta_len): + key = stream.read_string() + val = stream.read_string() + self.meta[key] = val + self.sync = stream.read_bytes(16) + + def __iter__(self): + """ facilitate 'for i in reader:' + """ + while True: + try: + next = self.load() + except EOFError: + raise StopIteration + yield next + + def get_type(self): + """ get type of this reader + """ + return self.type + + def load(self): + """ read one record + """ + stream = self.stream + buf_len = stream.read_int() + if buf_len == -1: + syncCheck = stream.read_bytes(16) + if syncCheck != self.sync: + raise ValueError("file corrupt, no a valid sequencefile") + buf_len = stream.read_int() + key_len = stream.read_int() + buf = stream.read_bytes(buf_len) + return buf[:key_len], buf[key_len:] + + def tell(self): + """ tell the position of currently readed + """ + return self.stream.tell() + + def seek(self, pos): + """ seek to the specified position + """ + self.stream.seek(pos) + + +class SequenceFileWriter(object): + """A wrapper around file-like object for aid writing sequence files + """ + # sequence file header for uncompressed, version 6 sequence files + SEQ_HEADER = "SEQ\x06" \ + "\"org.apache.hadoop.io.BytesWritable\"" \ + "org.apache.hadoop.io.BytesWritable" \ + "\x00\x00\x00\x00\x00\x00" + # after writing how many bytes of actual data we insert a sync marker + SYNC_INTERVAL = 2000 + + def __init__(self, f): + """ Construct a sequencefile writer for specified file-like object f + """ + self._f = f + self._sync_marker = ''.join( + [chr(random.randint(0, 255)) for k in range(0, 16)]) + self._write_seq_header() + self._bytes_to_prev_sync = 0 + + def write(self, key, value): + """Write key-value record to this sequence file + Args: + key: key of this record, should be a str + value: value of this record, should be a str + Returns: + number of bytes written + """ + key_len = len(key) + record_len = key_len + len(value) + b_record_len = struct.pack('>I', record_len) + b_key_len = struct.pack('>I', key_len) + self._f.write(b_record_len + b_key_len) + self._f.write(key) + self._f.write(value) + self._bytes_to_prev_sync += record_len + if self._bytes_to_prev_sync >= SequenceFileWriter.SYNC_INTERVAL: + self._write_sync_marker() + self._bytes_to_prev_sync = 0 + + def _write_seq_header(self): + """Write sequence file header to the underlying file + """ + self._f.write(SequenceFileWriter.SEQ_HEADER) + self._f.write(self._sync_marker) + + def _write_sync_marker(self): + """Write sync marker to this sequence file + """ + self._f.write("\xff\xff\xff\xff") + self._f.write(self._sync_marker) + + +def get_reader(f=None, cache=None): + """ get a kv reader for a stream 'f' + and the type can be 'kvfile' or 'seqfile' + """ + return SequenceFileReader(f, cache=cache) + + +def test_reader(file_path): + """ test reader of sequencefile + """ + seqfile = file_path + f = open(seqfile, 'rb') + reader = get_reader(f) + # reader = get_reader(sys.stdin, filetype) + ct = 0 + for r in reader: + ct += 1 + k, v = r + if ct % 2 == 0: + print struct.unpack('Q', k)[0], v + print "read a record with klen:%d,vlen:%d with count:%d" \ + % (len(k), len(v), ct) + + +if __name__ == "__main__": + """ read sequence file to kv, need a param sequence file addr + """ + if len(sys.argv) != 2: + print "error, usage:python kvtool.py seqfile_path" + else: + file_path = sys.argv[1] + test_reader(file_path) diff --git a/cube/cube-builder/tool/source/file.txt b/cube/cube-builder/tool/source/file.txt new file mode 100644 index 00000000..c29c165d --- /dev/null +++ b/cube/cube-builder/tool/source/file.txt @@ -0,0 +1,18 @@ +1:1 +2:2 +3:3 +4:4 +5:5 +7:7 +8:8 +9:9 +10:10 11 12 +11:this is eleven +12:value can string +1676869128226002114:48241 37064 91 -539 114 51 -122 269 229 -134 -282 +1657749292782759014:167 40 98 27 117 10 -29 15 74 67 -54 +1657618343175325789:1089 149 -28 110 -102 28 31 -11 6 62 -8 +1676918088588087334:5 -27 29 27 -6 -5 -12 -2 -4 -24 +1657827627464525106:236 229 -8 -42 124 -226 -307 179 40 -101 -162 +1657635389944349858:2149 1114 -33 -74 94 83 -53 -61 -104 -120 11 +1657837006931146296:28 16 -6 2 -15 -30 4 27 -24 13 34 -- GitLab