Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
Serving
提交
50b67b19
S
Serving
项目概览
PaddlePaddle
/
Serving
大约 2 年 前同步成功
通知
187
Star
833
Fork
253
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
105
列表
看板
标记
里程碑
合并请求
10
Wiki
2
Wiki
分析
仓库
DevOps
项目成员
Pages
S
Serving
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
105
Issue
105
列表
看板
标记
里程碑
合并请求
10
合并请求
10
Pages
分析
分析
仓库分析
DevOps
Wiki
2
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
50b67b19
编写于
8月 07, 2019
作者:
D
dangyifei
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add kv to seqfile and read seqfile tool
上级
d2291a46
变更
4
显示空白变更内容
内联
并排
Showing
4 changed file
with
367 addition
and
0 deletion
+367
-0
cube/cube-builder/CMakeLists.txt
cube/cube-builder/CMakeLists.txt
+2
-0
cube/cube-builder/tool/kv_to_seqfile.py
cube/cube-builder/tool/kv_to_seqfile.py
+73
-0
cube/cube-builder/tool/kvtool.py
cube/cube-builder/tool/kvtool.py
+274
-0
cube/cube-builder/tool/source/file.txt
cube/cube-builder/tool/source/file.txt
+18
-0
未找到文件。
cube/cube-builder/CMakeLists.txt
浏览文件 @
50b67b19
...
...
@@ -47,3 +47,5 @@ target_link_libraries(cube-builder ${DYNAMIC_LIB})
# install
install
(
TARGETS cube-builder RUNTIME DESTINATION
${
PADDLE_SERVING_INSTALL_DIR
}
/bin
)
install
(
DIRECTORY
${
CMAKE_CURRENT_LIST_DIR
}
/tool DESTINATION
${
PADDLE_SERVING_INSTALL_DIR
}
)
cube/cube-builder/tool/kv_to_seqfile.py
0 → 100644
浏览文件 @
50b67b19
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
import
struct
import
time
import
datetime
import
json
from
collections
import
OrderedDict
import
os
from
kvtool
import
SequenceFileWriter
NOW_TIMESTAMP
=
time
.
time
()
NOW_DATETIME
=
datetime
.
datetime
.
now
().
strftime
(
"%Y%m%d%H%M%S"
)
DATA_PATH
=
"./data/"
BASE_DATAFILE
=
DATA_PATH
+
NOW_DATETIME
+
"/base/feature"
BASE_DONEFILE
=
DATA_PATH
+
"donefile/base.txt"
PATCH_DATAFILE
=
"./donefile/"
+
NOW_DATETIME
+
"/patch/feature"
PATCH_DONEFILE
=
DATA_PATH
+
"donefile/patch.txt"
SOURCE_FILE
=
'./source/file.txt'
def
write_donefile
():
dict
=
OrderedDict
()
dict
[
"id"
]
=
str
(
int
(
NOW_TIMESTAMP
))
dict
[
"key"
]
=
dict
[
"id"
]
dict
[
"input"
]
=
os
.
path
.
abspath
(
os
.
path
.
dirname
(
BASE_DATAFILE
))
if
not
os
.
access
(
os
.
path
.
dirname
(
BASE_DONEFILE
),
os
.
F_OK
):
os
.
makedirs
(
os
.
path
.
dirname
(
BASE_DONEFILE
))
with
open
(
BASE_DONEFILE
,
"a+"
)
as
f
:
f
.
write
(
json
.
dumps
(
dict
)
+
'
\n
'
)
f
.
close
()
def
kv_to_seqfile
():
if
not
os
.
access
(
os
.
path
.
dirname
(
BASE_DATAFILE
),
os
.
F_OK
):
os
.
makedirs
(
os
.
path
.
dirname
(
BASE_DATAFILE
))
with
open
(
BASE_DATAFILE
,
"wb"
)
as
f
:
writer
=
SequenceFileWriter
(
f
)
res
=
[]
fp
=
open
(
SOURCE_FILE
,
'r'
)
try
:
lines
=
fp
.
readlines
()
finally
:
fp
.
close
()
for
line
in
lines
:
line_list
=
line
.
split
(
':'
)
key
=
int
(
line_list
[
0
])
value
=
str
(
line_list
[
1
]).
replace
(
'
\n
'
,
''
)
res
.
append
(
dict
)
key_bytes
=
struct
.
pack
(
'Q'
,
key
)
row_bytes
=
struct
.
pack
(
'%ss'
%
len
(
value
),
value
)
print
key
,
':'
,
value
,
'->'
,
key_bytes
,
':'
,
row_bytes
writer
.
write
(
key_bytes
,
row_bytes
)
f
.
close
()
write_donefile
()
if
__name__
==
'__main__'
:
"""
tran kv to sequence file and auto create donefile file, you can modify source file addr,output addr in line 25-32
"""
kv_to_seqfile
()
cube/cube-builder/tool/kvtool.py
0 → 100644
浏览文件 @
50b67b19
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
import
sys
import
struct
import
random
class
Stream
(
object
):
""" bytes stream like sys.stdin
"""
def
__init__
(
self
,
source
=
None
,
cache
=
None
):
""" init
"""
self
.
_src
=
source
self
.
_cache_to
=
cache
self
.
_cache_fd
=
None
def
read_bytes
(
self
,
num
):
"""read bytes"""
data
=
self
.
_src
.
read
(
num
)
if
len
(
data
)
<
num
:
if
self
.
_cache_fd
is
not
None
:
if
len
(
data
)
>
0
:
self
.
_cache_fd
.
write
(
data
)
self
.
_cache_fd
.
close
()
print
>>
sys
.
stderr
,
'succeed to cache file[%s]'
%
(
self
.
_cache_to
)
self
.
_cache_fd
=
None
raise
EOFError
else
:
if
self
.
_cache_to
is
not
None
:
if
self
.
_cache_fd
is
None
:
self
.
_cache_fd
=
open
(
self
.
_cache_to
,
'wb'
)
self
.
_cache_fd
.
write
(
data
)
return
data
def
read_int
(
self
):
"""read int"""
data
=
self
.
read_bytes
(
4
)
return
struct
.
unpack
(
'!i'
,
data
)[
0
]
def
read_byte
(
self
):
"""read byte"""
byte
=
self
.
read_bytes
(
1
)
return
struct
.
unpack
(
'!b'
,
byte
)[
0
]
def
read_string
(
self
):
"""read string"""
str_len
=
self
.
read_vint
()
return
unicode
(
self
.
read_bytes
(
str_len
),
'utf-8'
)
def
read_bool
(
self
):
"""read bool"""
return
bool
(
self
.
read_byte
())
def
read_vint
(
self
):
"""read vint"""
first
=
self
.
read_byte
()
l
=
self
.
_decode_vint_size
(
first
)
if
l
==
1
:
return
first
x
=
0
for
i
in
range
(
l
-
1
):
b
=
self
.
read_byte
()
x
=
x
<<
8
x
=
x
|
(
b
&
0xFF
)
if
self
.
_is_negative_vint
(
first
):
return
x
^
-
1
return
x
def
_is_negative_vint
(
self
,
val
):
"""check is negative vint"""
return
val
<
-
120
or
(
val
>=
-
122
and
val
<
0
)
def
_decode_vint_size
(
self
,
val
):
"""decode vint size"""
if
val
>=
-
122
:
return
1
elif
val
<
-
120
:
return
-
119
-
val
return
-
111
-
val
def
tell
(
self
):
""" tell """
return
self
.
_src
.
tell
()
def
seek
(
self
,
pos
):
""" seek """
self
.
_src
.
seek
(
pos
)
class
SequenceFileReader
():
""" a reader for sequencefile
"""
def
__init__
(
self
,
seqfile
=
None
,
cache
=
None
):
""" init
"""
self
.
type
=
'seqfile'
if
seqfile
is
None
:
seqfile
=
sys
.
stdin
self
.
stream
=
Stream
(
seqfile
,
cache
=
cache
)
self
.
version
=
None
# self.key_class = None
self
.
compression_class
=
None
# self.value_class = None
self
.
compression
=
False
self
.
block_compression
=
False
self
.
meta
=
{}
self
.
sync
=
None
self
.
_read_header
()
if
self
.
compression
or
self
.
block_compression
:
raise
NotImplementedError
(
"reading of seqfiles with compression is not implemented."
)
def
_read_header
(
self
):
""" read sequencefile header
"""
stream
=
self
.
stream
seq
=
stream
.
read_bytes
(
3
)
if
seq
!=
"SEQ"
:
raise
ValueError
(
"given file is not a sequence-file"
)
self
.
version
=
stream
.
read_byte
()
self
.
key_class
=
stream
.
read_string
()
self
.
value_class
=
stream
.
read_string
()
self
.
compression
=
stream
.
read_bool
()
self
.
block_compression
=
stream
.
read_bool
()
if
self
.
compression
:
self
.
compression_class
=
stream
.
read_string
()
meta_len
=
stream
.
read_int
()
for
i
in
range
(
meta_len
):
key
=
stream
.
read_string
()
val
=
stream
.
read_string
()
self
.
meta
[
key
]
=
val
self
.
sync
=
stream
.
read_bytes
(
16
)
def
__iter__
(
self
):
""" facilitate 'for i in reader:'
"""
while
True
:
try
:
next
=
self
.
load
()
except
EOFError
:
raise
StopIteration
yield
next
def
get_type
(
self
):
""" get type of this reader
"""
return
self
.
type
def
load
(
self
):
""" read one record
"""
stream
=
self
.
stream
buf_len
=
stream
.
read_int
()
if
buf_len
==
-
1
:
syncCheck
=
stream
.
read_bytes
(
16
)
if
syncCheck
!=
self
.
sync
:
raise
ValueError
(
"file corrupt, no a valid sequencefile"
)
buf_len
=
stream
.
read_int
()
key_len
=
stream
.
read_int
()
buf
=
stream
.
read_bytes
(
buf_len
)
return
buf
[:
key_len
],
buf
[
key_len
:]
def
tell
(
self
):
""" tell the position of currently readed
"""
return
self
.
stream
.
tell
()
def
seek
(
self
,
pos
):
""" seek to the specified position
"""
self
.
stream
.
seek
(
pos
)
class
SequenceFileWriter
(
object
):
"""A wrapper around file-like object for aid writing sequence files
"""
# sequence file header for uncompressed, version 6 sequence files
SEQ_HEADER
=
"SEQ
\x06
"
\
"
\"
org.apache.hadoop.io.BytesWritable
\"
"
\
"org.apache.hadoop.io.BytesWritable"
\
"
\x00\x00\x00\x00\x00\x00
"
# after writing how many bytes of actual data we insert a sync marker
SYNC_INTERVAL
=
2000
def
__init__
(
self
,
f
):
""" Construct a sequencefile writer for specified file-like object f
"""
self
.
_f
=
f
self
.
_sync_marker
=
''
.
join
(
[
chr
(
random
.
randint
(
0
,
255
))
for
k
in
range
(
0
,
16
)])
self
.
_write_seq_header
()
self
.
_bytes_to_prev_sync
=
0
def
write
(
self
,
key
,
value
):
"""Write key-value record to this sequence file
Args:
key: key of this record, should be a str
value: value of this record, should be a str
Returns:
number of bytes written
"""
key_len
=
len
(
key
)
record_len
=
key_len
+
len
(
value
)
b_record_len
=
struct
.
pack
(
'>I'
,
record_len
)
b_key_len
=
struct
.
pack
(
'>I'
,
key_len
)
self
.
_f
.
write
(
b_record_len
+
b_key_len
)
self
.
_f
.
write
(
key
)
self
.
_f
.
write
(
value
)
self
.
_bytes_to_prev_sync
+=
record_len
if
self
.
_bytes_to_prev_sync
>=
SequenceFileWriter
.
SYNC_INTERVAL
:
self
.
_write_sync_marker
()
self
.
_bytes_to_prev_sync
=
0
def
_write_seq_header
(
self
):
"""Write sequence file header to the underlying file
"""
self
.
_f
.
write
(
SequenceFileWriter
.
SEQ_HEADER
)
self
.
_f
.
write
(
self
.
_sync_marker
)
def
_write_sync_marker
(
self
):
"""Write sync marker to this sequence file
"""
self
.
_f
.
write
(
"
\xff\xff\xff\xff
"
)
self
.
_f
.
write
(
self
.
_sync_marker
)
def
get_reader
(
f
=
None
,
cache
=
None
):
""" get a kv reader for a stream 'f'
and the type can be 'kvfile' or 'seqfile'
"""
return
SequenceFileReader
(
f
,
cache
=
cache
)
def
test_reader
(
file_path
):
""" test reader of sequencefile
"""
seqfile
=
file_path
f
=
open
(
seqfile
,
'rb'
)
reader
=
get_reader
(
f
)
# reader = get_reader(sys.stdin, filetype)
ct
=
0
for
r
in
reader
:
ct
+=
1
k
,
v
=
r
if
ct
%
2
==
0
:
print
struct
.
unpack
(
'Q'
,
k
)[
0
],
v
print
"read a record with klen:%d,vlen:%d with count:%d"
\
%
(
len
(
k
),
len
(
v
),
ct
)
if
__name__
==
"__main__"
:
""" read sequence file to kv, need a param sequence file addr
"""
if
len
(
sys
.
argv
)
!=
2
:
print
"error, usage:python kvtool.py seqfile_path"
else
:
file_path
=
sys
.
argv
[
1
]
test_reader
(
file_path
)
cube/cube-builder/tool/source/file.txt
0 → 100644
浏览文件 @
50b67b19
1:1
2:2
3:3
4:4
5:5
7:7
8:8
9:9
10:10 11 12
11:this is eleven
12:value can string
1676869128226002114:48241 37064 91 -539 114 51 -122 269 229 -134 -282
1657749292782759014:167 40 98 27 117 10 -29 15 74 67 -54
1657618343175325789:1089 149 -28 110 -102 28 31 -11 6 62 -8
1676918088588087334:5 -27 29 27 -6 -5 -12 -2 -4 -24
1657827627464525106:236 229 -8 -42 124 -226 -307 179 40 -101 -162
1657635389944349858:2149 1114 -33 -74 94 83 -53 -61 -104 -120 11
1657837006931146296:28 16 -6 2 -15 -30 4 27 -24 13 34
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录