par.rs 4.1 KB
Newer Older
P
Patrick Walton 已提交
1
use future_spawn = future::spawn;
E
Eric Holk 已提交
2

3
export map, mapi, alli, any, mapi_factory;
E
Eric Holk 已提交
4

5 6 7 8
/**
 * The maximum number of tasks this module will spawn for a single
 * operation.
 */
E
Eric Holk 已提交
9 10
const max_tasks : uint = 32u;

11
/// The minimum number of elements each task will process.
E
Eric Holk 已提交
12 13
const min_granularity : uint = 1024u;

14 15 16 17 18 19 20
/**
 * An internal helper to map a function over a large vector and
 * return the intermediate results.
 *
 * This is used to build most of the other parallel vector functions,
 * like map or alli.
 */
21
fn map_slices<A: copy send, B: copy send>(
22 23 24
    xs: ~[A],
    f: fn() -> fn~(uint, v: &[A]) -> B)
    -> ~[B] {
E
Eric Holk 已提交
25 26 27

    let len = xs.len();
    if len < min_granularity {
28
        log(info, ~"small slice");
E
Eric Holk 已提交
29
        // This is a small vector, fall back on the normal map.
30
        ~[f()(0u, xs)]
E
Eric Holk 已提交
31 32
    }
    else {
33
        let num_tasks = uint::min(max_tasks, len / min_granularity);
E
Eric Holk 已提交
34 35 36

        let items_per_task = len / num_tasks;

37
        let mut futures = ~[];
E
Eric Holk 已提交
38
        let mut base = 0u;
39
        log(info, ~"spawning tasks");
E
Eric Holk 已提交
40
        while base < len {
41
            let end = uint::min(len, base + items_per_task);
42
            do vec::as_buf(xs) |p, _len| {
43
                let f = f();
B
Brian Anderson 已提交
44
                let f = do future_spawn() |copy base| {
E
Eric Holk 已提交
45 46 47 48
                    unsafe {
                        let len = end - base;
                        let slice = (ptr::offset(p, base),
                                     len * sys::size_of::<A>());
P
Paul Stansifer 已提交
49
                        log(info, fmt!("pre-slice: %?", (base, slice)));
50
                        let slice : &[A] =
B
Brian Anderson 已提交
51
                            unsafe::reinterpret_cast(&slice);
P
Paul Stansifer 已提交
52 53
                        log(info, fmt!("slice: %?",
                                       (base, vec::len(slice), end - base)));
E
Eric Holk 已提交
54
                        assert(vec::len(slice) == end - base);
55
                        f(base, slice)
E
Eric Holk 已提交
56
                    }
57 58
                };
                vec::push(futures, f);
E
Eric Holk 已提交
59 60 61
            };
            base += items_per_task;
        }
62
        log(info, ~"tasks spawned");
E
Eric Holk 已提交
63

P
Paul Stansifer 已提交
64
        log(info, fmt!("num_tasks: %?", (num_tasks, futures.len())));
E
Eric Holk 已提交
65 66
        assert(num_tasks == futures.len());

B
Brian Anderson 已提交
67
        let r = do futures.map() |ys| {
E
Eric Holk 已提交
68 69 70 71 72 73 74
            ys.get()
        };
        assert(r.len() == futures.len());
        r
    }
}

75
/// A parallel version of map.
76
fn map<A: copy send, B: copy send>(xs: ~[A], f: fn~(A) -> B) -> ~[B] {
B
Brian Anderson 已提交
77
    vec::concat(map_slices(xs, || {
78
        fn~(_base: uint, slice : &[A], copy f) -> ~[B] {
79
            vec::map(slice, |x| f(x))
80
        }
81
    }))
E
Eric Holk 已提交
82 83
}

84
/// A parallel version of mapi.
85 86
fn mapi<A: copy send, B: copy send>(xs: ~[A],
                                    f: fn~(uint, A) -> B) -> ~[B] {
B
Brian Anderson 已提交
87
    let slices = map_slices(xs, || {
88
        fn~(base: uint, slice : &[A], copy f) -> ~[B] {
B
Brian Anderson 已提交
89
            vec::mapi(slice, |i, x| {
90
                f(i + base, x)
91
            })
92
        }
93
    });
94 95 96 97 98 99
    let r = vec::concat(slices);
    log(info, (r.len(), xs.len()));
    assert(r.len() == xs.len());
    r
}

100 101 102 103 104 105
/**
 * A parallel version of mapi.
 *
 * In this case, f is a function that creates functions to run over the
 * inner elements. This is to skirt the need for copy constructors.
 */
106
fn mapi_factory<A: copy send, B: copy send>(
107
    xs: ~[A], f: fn() -> fn~(uint, A) -> B) -> ~[B] {
B
Brian Anderson 已提交
108
    let slices = map_slices(xs, || {
109
        let f = f();
110
        fn~(base: uint, slice : &[A], move f) -> ~[B] {
B
Brian Anderson 已提交
111
            vec::mapi(slice, |i, x| {
112
                f(i + base, x)
113
            })
E
Eric Holk 已提交
114
        }
115
    });
E
Eric Holk 已提交
116 117 118 119 120 121
    let r = vec::concat(slices);
    log(info, (r.len(), xs.len()));
    assert(r.len() == xs.len());
    r
}

122
/// Returns true if the function holds for all elements in the vector.
123
fn alli<A: copy send>(xs: ~[A], f: fn~(uint, A) -> bool) -> bool {
B
Brian Anderson 已提交
124
    do vec::all(map_slices(xs, || {
125
        fn~(base: uint, slice : &[A], copy f) -> bool {
B
Brian Anderson 已提交
126
            vec::alli(slice, |i, x| {
E
Eric Holk 已提交
127
                f(i + base, x)
128
            })
E
Eric Holk 已提交
129
        }
B
Brian Anderson 已提交
130
    })) |x| { x }
E
Eric Holk 已提交
131 132
}

133
/// Returns true if the function holds for any elements in the vector.
134
fn any<A: copy send>(xs: ~[A], f: fn~(A) -> bool) -> bool {
B
Brian Anderson 已提交
135
    do vec::any(map_slices(xs, || {
136
        fn~(_base : uint, slice: &[A], copy f) -> bool {
137
            vec::any(slice, |x| f(x))
E
Eric Holk 已提交
138
        }
B
Brian Anderson 已提交
139
    })) |x| { x }
E
Eric Holk 已提交
140
}