par.rs 4.3 KB
Newer Older
E
Eric Holk 已提交
1 2 3 4
import comm::port;
import comm::chan;
import comm::send;
import comm::recv;
5
import future_spawn = future::spawn;
6
import future::extensions;
7
import core::vec::extensions;
E
Eric Holk 已提交
8

9
export map, mapi, alli, any, mapi_factory;
E
Eric Holk 已提交
10

11 12 13 14
/**
 * The maximum number of tasks this module will spawn for a single
 * operation.
 */
E
Eric Holk 已提交
15 16
const max_tasks : uint = 32u;

17
/// The minimum number of elements each task will process.
E
Eric Holk 已提交
18 19
const min_granularity : uint = 1024u;

20 21 22 23 24 25 26
/**
 * An internal helper to map a function over a large vector and
 * return the intermediate results.
 *
 * This is used to build most of the other parallel vector functions,
 * like map or alli.
 */
27
fn map_slices<A: copy send, B: copy send>(
28 29 30
    xs: ~[A],
    f: fn() -> fn~(uint, v: &[A]) -> B)
    -> ~[B] {
E
Eric Holk 已提交
31 32 33 34 35

    let len = xs.len();
    if len < min_granularity {
        log(info, "small slice");
        // This is a small vector, fall back on the normal map.
36
        ~[f()(0u, xs)]
E
Eric Holk 已提交
37 38 39 40 41 42
    }
    else {
        let num_tasks = uint::min(max_tasks, len / min_granularity);

        let items_per_task = len / num_tasks;

43
        let mut futures = ~[];
E
Eric Holk 已提交
44 45 46 47
        let mut base = 0u;
        log(info, "spawning tasks");
        while base < len {
            let end = uint::min(len, base + items_per_task);
T
Tim Chevalier 已提交
48
            // FIXME: why is the ::<A, ()> annotation required here? (#2617)
B
Brian Anderson 已提交
49
            do vec::unpack_slice::<A, ()>(xs) |p, _len| {
50
                let f = f();
B
Brian Anderson 已提交
51
                let f = do future_spawn() |copy base| {
E
Eric Holk 已提交
52 53 54 55 56
                    unsafe {
                        let len = end - base;
                        let slice = (ptr::offset(p, base),
                                     len * sys::size_of::<A>());
                        log(info, #fmt("pre-slice: %?", (base, slice)));
57
                        let slice : &[A] =
E
Eric Holk 已提交
58 59 60 61
                            unsafe::reinterpret_cast(slice);
                        log(info, #fmt("slice: %?",
                                       (base, vec::len(slice), end - base)));
                        assert(vec::len(slice) == end - base);
62
                        f(base, slice)
E
Eric Holk 已提交
63
                    }
64 65
                };
                vec::push(futures, f);
E
Eric Holk 已提交
66 67 68 69 70 71 72 73
            };
            base += items_per_task;
        }
        log(info, "tasks spawned");

        log(info, #fmt("num_tasks: %?", (num_tasks, futures.len())));
        assert(num_tasks == futures.len());

B
Brian Anderson 已提交
74
        let r = do futures.map() |ys| {
E
Eric Holk 已提交
75 76 77 78 79 80 81
            ys.get()
        };
        assert(r.len() == futures.len());
        r
    }
}

82
/// A parallel version of map.
83
fn map<A: copy send, B: copy send>(xs: ~[A], f: fn~(A) -> B) -> ~[B] {
B
Brian Anderson 已提交
84
    vec::concat(map_slices(xs, || {
85
        fn~(_base: uint, slice : &[A], copy f) -> ~[B] {
86 87
            vec::map(slice, f)
        }
88
    }))
E
Eric Holk 已提交
89 90
}

91
/// A parallel version of mapi.
92 93
fn mapi<A: copy send, B: copy send>(xs: ~[A],
                                    f: fn~(uint, A) -> B) -> ~[B] {
B
Brian Anderson 已提交
94
    let slices = map_slices(xs, || {
95
        fn~(base: uint, slice : &[A], copy f) -> ~[B] {
B
Brian Anderson 已提交
96
            vec::mapi(slice, |i, x| {
97
                f(i + base, x)
98
            })
99
        }
100
    });
101 102 103 104 105 106
    let r = vec::concat(slices);
    log(info, (r.len(), xs.len()));
    assert(r.len() == xs.len());
    r
}

107 108 109 110 111 112
/**
 * A parallel version of mapi.
 *
 * In this case, f is a function that creates functions to run over the
 * inner elements. This is to skirt the need for copy constructors.
 */
113
fn mapi_factory<A: copy send, B: copy send>(
114
    xs: ~[A], f: fn() -> fn~(uint, A) -> B) -> ~[B] {
B
Brian Anderson 已提交
115
    let slices = map_slices(xs, || {
116
        let f = f();
117
        fn~(base: uint, slice : &[A], move f) -> ~[B] {
B
Brian Anderson 已提交
118
            vec::mapi(slice, |i, x| {
119
                f(i + base, x)
120
            })
E
Eric Holk 已提交
121
        }
122
    });
E
Eric Holk 已提交
123 124 125 126 127 128
    let r = vec::concat(slices);
    log(info, (r.len(), xs.len()));
    assert(r.len() == xs.len());
    r
}

129
/// Returns true if the function holds for all elements in the vector.
130
fn alli<A: copy send>(xs: ~[A], f: fn~(uint, A) -> bool) -> bool {
B
Brian Anderson 已提交
131
    do vec::all(map_slices(xs, || {
132
        fn~(base: uint, slice : &[A], copy f) -> bool {
B
Brian Anderson 已提交
133
            vec::alli(slice, |i, x| {
E
Eric Holk 已提交
134
                f(i + base, x)
135
            })
E
Eric Holk 已提交
136
        }
B
Brian Anderson 已提交
137
    })) |x| { x }
E
Eric Holk 已提交
138 139
}

140
/// Returns true if the function holds for any elements in the vector.
141
fn any<A: copy send>(xs: ~[A], f: fn~(A) -> bool) -> bool {
B
Brian Anderson 已提交
142
    do vec::any(map_slices(xs, || {
143
        fn~(_base : uint, slice: &[A], copy f) -> bool {
E
Eric Holk 已提交
144 145
            vec::any(slice, f)
        }
B
Brian Anderson 已提交
146
    })) |x| { x }
E
Eric Holk 已提交
147
}