par.rs 3.3 KB
Newer Older
E
Eric Holk 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
import comm::port;
import comm::chan;
import comm::send;
import comm::recv;
import future::future;

export map, mapi, alli, any;

#[doc="The maximum number of tasks this module will spawn for a single
operationg."]
const max_tasks : uint = 32u;

#[doc="The minimum number of elements each task will process."]
const min_granularity : uint = 1024u;

#[doc="An internal helper to map a function over a large vector and
return the intermediate results.

This is used to build most of the other parallel vector functions,
like map or alli."]
fn map_slices<A: copy send, B: copy send>(xs: [A],
                                          f: fn~(uint, [const A]/&) -> B)
    -> [B] {

    let len = xs.len();
    if len < min_granularity {
        log(info, "small slice");
        // This is a small vector, fall back on the normal map.
        [f(0u, xs)]
    }
    else {
        let num_tasks = uint::min(max_tasks, len / min_granularity);

        let items_per_task = len / num_tasks;

        let mut futures = [];
        let mut base = 0u;
        log(info, "spawning tasks");
        while base < len {
            let end = uint::min(len, base + items_per_task);
            // FIXME: why is the ::<A, ()> annotation required here?
            vec::unpack_slice::<A, ()>(xs) {|p, _len|
                let f = ptr::addr_of(f);
                futures += [future::spawn() {|copy base|
                    unsafe {
                        let len = end - base;
                        let slice = (ptr::offset(p, base),
                                     len * sys::size_of::<A>());
                        log(info, #fmt("pre-slice: %?", (base, slice)));
                        let slice : [const A]/& =
                            unsafe::reinterpret_cast(slice);
                        log(info, #fmt("slice: %?",
                                       (base, vec::len(slice), end - base)));
                        assert(vec::len(slice) == end - base);
                        (*f)(base, slice)
                    }
                }];
            };
            base += items_per_task;
        }
        log(info, "tasks spawned");

        log(info, #fmt("num_tasks: %?", (num_tasks, futures.len())));
        assert(num_tasks == futures.len());

        let r = futures.map() {|ys|
            ys.get()
        };
        assert(r.len() == futures.len());
        r
    }
}

#[doc="A parallel version of map."]
fn map<A: copy send, B: copy send>(xs: [A], f: fn~(A) -> B) -> [B] {
    vec::concat(map_slices(xs) {|_base, slice|
        vec::map(slice, f)
    })
}

#[doc="A parallel version of mapi."]
fn mapi<A: copy send, B: copy send>(xs: [A], f: fn~(uint, A) -> B) -> [B] {
    let slices = map_slices(xs) {|base, slice|
        vec::mapi(slice) {|i, x|
            f(i + base, x)
        }
    };
    let r = vec::concat(slices);
    log(info, (r.len(), xs.len()));
    assert(r.len() == xs.len());
    r
}

#[doc="Returns true if the function holds for all elements in the vector."]
fn alli<A: copy send>(xs: [A], f: fn~(uint, A) -> bool) -> bool {
    vec::all(map_slices(xs) {|base, slice|
        vec::alli(slice) {|i, x|
            f(i + base, x)
        }
    }) {|x| x }
}

#[doc="Returns true if the function holds for any elements in the vector."]
fn any<A: copy send>(xs: [A], f: fn~(A) -> bool) -> bool {
    vec::any(map_slices(xs) {|_base, slice|
        vec::any(slice, f)
    }) {|x| x }
}