提交 28452e1e 编写于 作者: M michaelm

Merge

/*
* Copyright (c) 1998, 2005, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 1998, 2010, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
......@@ -133,27 +133,60 @@ set_event_notification(jvmtiEventMode mode, EventIndex ei)
return error;
}
typedef struct {
int major;
int minor;
} version_type;
typedef struct {
version_type runtime;
version_type compiletime;
} compatible_versions_type;
/*
* List of explicitly compatible JVMTI versions, specified as
* { runtime version, compile-time version } pairs. -1 is a wildcard.
*/
static int nof_compatible_versions = 3;
static compatible_versions_type compatible_versions_list[] = {
/*
* FIXUP: Allow version 0 to be compatible with anything
* Special check for FCS of 1.0.
*/
{ { 0, -1 }, { -1, -1 } },
{ { -1, -1 }, { 0, -1 } },
/*
* 1.2 is runtime compatible with 1.1 -- just make sure to check the
* version before using any new 1.2 features
*/
{ { 1, 1 }, { 1, 2 } }
};
/* Logic to determine JVMTI version compatibility */
static jboolean
compatible_versions(jint major_runtime, jint minor_runtime,
jint major_compiletime, jint minor_compiletime)
{
#if 1 /* FIXUP: We allow version 0 to be compatible with anything */
/* Special check for FCS of 1.0. */
if ( major_runtime == 0 || major_compiletime == 0 ) {
return JNI_TRUE;
}
#endif
/* Runtime major version must match. */
if ( major_runtime != major_compiletime ) {
return JNI_FALSE;
}
/* Runtime minor version must be >= the version compiled with. */
if ( minor_runtime < minor_compiletime ) {
return JNI_FALSE;
/*
* First check to see if versions are explicitly compatible via the
* list specified above.
*/
int i;
for (i = 0; i < nof_compatible_versions; ++i) {
version_type runtime = compatible_versions_list[i].runtime;
version_type comptime = compatible_versions_list[i].compiletime;
if ((major_runtime == runtime.major || runtime.major == -1) &&
(minor_runtime == runtime.minor || runtime.minor == -1) &&
(major_compiletime == comptime.major || comptime.major == -1) &&
(minor_compiletime == comptime.minor || comptime.minor == -1)) {
return JNI_TRUE;
}
}
/* Assumed compatible */
return JNI_TRUE;
return major_runtime == major_compiletime &&
minor_runtime >= minor_compiletime;
}
/* OnLoad startup:
......
/*
* Copyright (c) 2001, 2008, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
......@@ -39,6 +39,7 @@
#include "stepControl.h"
#include "threadControl.h"
#include "SDE.h"
#include "jvmti.h"
typedef struct ClassFilter {
jclass clazz;
......@@ -275,6 +276,24 @@ patternStringMatch(char *classname, const char *pattern)
}
}
static jboolean isVersionGte12x() {
jint version;
jvmtiError err =
JVMTI_FUNC_PTR(gdata->jvmti,GetVersionNumber)(gdata->jvmti, &version);
if (err == JVMTI_ERROR_NONE) {
jint major, minor;
major = (version & JVMTI_VERSION_MASK_MAJOR)
>> JVMTI_VERSION_SHIFT_MAJOR;
minor = (version & JVMTI_VERSION_MASK_MINOR)
>> JVMTI_VERSION_SHIFT_MINOR;
return (major > 1 || major == 1 && minor >= 2);
} else {
return JNI_FALSE;
}
}
/* Return the object instance in which the event occurred */
/* Return NULL if static or if an error occurs */
static jobject
......@@ -286,6 +305,14 @@ eventInstance(EventInfo *evinfo)
jint modifiers = 0;
jvmtiError error;
static jboolean got_version = JNI_FALSE;
static jboolean is_version_gte_12x = JNI_FALSE;
if (!got_version) {
is_version_gte_12x = isVersionGte12x();
got_version = JNI_TRUE;
}
switch (evinfo->ei) {
case EI_SINGLE_STEP:
case EI_BREAKPOINT:
......@@ -314,11 +341,18 @@ eventInstance(EventInfo *evinfo)
/* fail if error or static (0x8) */
if (error == JVMTI_ERROR_NONE && thread!=NULL && (modifiers & 0x8) == 0) {
FrameNumber fnum = 0;
/* get slot zero object "this" */
error = JVMTI_FUNC_PTR(gdata->jvmti,GetLocalObject)
(gdata->jvmti, thread, fnum, 0, &object);
if (error != JVMTI_ERROR_NONE)
if (is_version_gte_12x) {
/* Use new 1.2.x function, GetLocalInstance */
error = JVMTI_FUNC_PTR(gdata->jvmti,GetLocalInstance)
(gdata->jvmti, thread, fnum, &object);
} else {
/* get slot zero object "this" */
error = JVMTI_FUNC_PTR(gdata->jvmti,GetLocalObject)
(gdata->jvmti, thread, fnum, 0, &object);
}
if (error != JVMTI_ERROR_NONE) {
object = NULL;
}
}
return object;
......
......@@ -1704,7 +1704,7 @@ class BandStructure {
for (int i = 0; i < ATTR_CONTEXT_LIMIT; i++) {
assert(attrIndexLimit[i] == 0);
attrIndexLimit[i] = 32; // just for the sake of predefs.
attrDefs.set(i, new ArrayList<>(Collections.nCopies(
attrDefs.set(i, new ArrayList<Attribute.Layout>(Collections.nCopies(
attrIndexLimit[i], (Attribute.Layout)null)));
}
......
......@@ -329,7 +329,7 @@ public class ObjectStreamClass implements Serializable {
entry = th;
}
if (future.set(entry)) {
Caches.localDescs.put(key, new SoftReference<>(entry));
Caches.localDescs.put(key, new SoftReference<Object>(entry));
} else {
// nested lookup call already set future
entry = future.get();
......@@ -2118,7 +2118,7 @@ public class ObjectStreamClass implements Serializable {
entry = th;
}
future.set(entry);
Caches.reflectors.put(key, new SoftReference<>(entry));
Caches.reflectors.put(key, new SoftReference<Object>(entry));
}
if (entry instanceof FieldReflector) {
......
......@@ -67,7 +67,7 @@ class StringCoding {
}
private static <T> void set(ThreadLocal<SoftReference<T>> tl, T ob) {
tl.set(new SoftReference<>(ob));
tl.set(new SoftReference<T>(ob));
}
// Trim the given byte array to the given length
......
......@@ -473,7 +473,9 @@ public class Timestamp extends java.util.Date {
* @since 1.4
*/
public int compareTo(Timestamp ts) {
int i = super.compareTo(ts);
long thisTime = this.getTime();
long anotherTime = ts.getTime();
int i = (thisTime<anotherTime ? -1 :(thisTime==anotherTime?0 :1));
if (i == 0) {
if (nanos > ts.nanos) {
return 1;
......
......@@ -1452,10 +1452,10 @@ public class Collections {
* when o is a Map.Entry, and calls o.setValue.
*/
public boolean containsAll(Collection<?> coll) {
Iterator<?> it = coll.iterator();
while (it.hasNext())
if (!contains(it.next())) // Invokes safe contains() above
for (Object e : coll) {
if (!contains(e)) // Invokes safe contains() above
return false;
}
return true;
}
public boolean equals(Object o) {
......
......@@ -26,9 +26,9 @@
package java.util;
/**
* Linked list implementation of the {@link List} and {@link Deque} interfaces.
* Implements all optional operations, and permits all elements (including
* {@code null}).
* Doubly-linked list implementation of the {@code List} and {@code Deque}
* interfaces. Implements all optional list operations, and permits all
* elements (including {@code null}).
*
* <p>All of the operations perform as could be expected for a doubly-linked
* list. Operations that index into the list will traverse the list from
......@@ -249,7 +249,7 @@ public class LinkedList<E>
* @return the last element in this list
* @throws NoSuchElementException if this list is empty
*/
public E getLast() {
public E getLast() {
final Node<E> l = last;
if (l == null)
throw new NoSuchElementException();
......
......@@ -869,6 +869,8 @@ public class ConcurrentLinkedDeque<E>
/**
* Inserts the specified element at the front of this deque.
* As the deque is unbounded, this method will never throw
* {@link IllegalStateException}.
*
* @throws NullPointerException if the specified element is null
*/
......@@ -878,6 +880,8 @@ public class ConcurrentLinkedDeque<E>
/**
* Inserts the specified element at the end of this deque.
* As the deque is unbounded, this method will never throw
* {@link IllegalStateException}.
*
* <p>This method is equivalent to {@link #add}.
*
......@@ -889,8 +893,9 @@ public class ConcurrentLinkedDeque<E>
/**
* Inserts the specified element at the front of this deque.
* As the deque is unbounded, this method will never return {@code false}.
*
* @return {@code true} always
* @return {@code true} (as specified by {@link Deque#offerFirst})
* @throws NullPointerException if the specified element is null
*/
public boolean offerFirst(E e) {
......@@ -900,10 +905,11 @@ public class ConcurrentLinkedDeque<E>
/**
* Inserts the specified element at the end of this deque.
* As the deque is unbounded, this method will never return {@code false}.
*
* <p>This method is equivalent to {@link #add}.
*
* @return {@code true} always
* @return {@code true} (as specified by {@link Deque#offerLast})
* @throws NullPointerException if the specified element is null
*/
public boolean offerLast(E e) {
......@@ -983,6 +989,7 @@ public class ConcurrentLinkedDeque<E>
/**
* Inserts the specified element at the tail of this deque.
* As the deque is unbounded, this method will never return {@code false}.
*
* @return {@code true} (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null
......@@ -993,6 +1000,8 @@ public class ConcurrentLinkedDeque<E>
/**
* Inserts the specified element at the tail of this deque.
* As the deque is unbounded, this method will never throw
* {@link IllegalStateException} or return {@code false}.
*
* @return {@code true} (as specified by {@link Collection#add})
* @throws NullPointerException if the specified element is null
......
......@@ -269,6 +269,8 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
/**
* Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never throw
* {@link IllegalStateException} or return {@code false}.
*
* @return {@code true} (as specified by {@link Collection#add})
* @throws NullPointerException if the specified element is null
......@@ -298,6 +300,7 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
/**
* Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never return {@code false}.
*
* @return {@code true} (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null
......
......@@ -374,17 +374,11 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
null, null, 1);
}
/** Updater for casHead */
private static final
AtomicReferenceFieldUpdater<ConcurrentSkipListMap, HeadIndex>
headUpdater = AtomicReferenceFieldUpdater.newUpdater
(ConcurrentSkipListMap.class, HeadIndex.class, "head");
/**
* compareAndSet head node
*/
private boolean casHead(HeadIndex<K,V> cmp, HeadIndex<K,V> val) {
return headUpdater.compareAndSet(this, cmp, val);
return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
}
/* ---------------- Nodes -------------- */
......@@ -423,28 +417,18 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
this.next = next;
}
/** Updater for casNext */
static final AtomicReferenceFieldUpdater<Node, Node>
nextUpdater = AtomicReferenceFieldUpdater.newUpdater
(Node.class, Node.class, "next");
/** Updater for casValue */
static final AtomicReferenceFieldUpdater<Node, Object>
valueUpdater = AtomicReferenceFieldUpdater.newUpdater
(Node.class, Object.class, "value");
/**
* compareAndSet value field
*/
boolean casValue(Object cmp, Object val) {
return valueUpdater.compareAndSet(this, cmp, val);
return UNSAFE.compareAndSwapObject(this, valueOffset, cmp, val);
}
/**
* compareAndSet next field
*/
boolean casNext(Node<K,V> cmp, Node<K,V> val) {
return nextUpdater.compareAndSet(this, cmp, val);
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
/**
......@@ -522,6 +506,14 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
return null;
return new AbstractMap.SimpleImmutableEntry<K,V>(key, v);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
private static final long valueOffset =
objectFieldOffset(UNSAFE, "value", Node.class);
private static final long nextOffset =
objectFieldOffset(UNSAFE, "next", Node.class);
}
/* ---------------- Indexing -------------- */
......@@ -547,16 +539,11 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
this.right = right;
}
/** Updater for casRight */
static final AtomicReferenceFieldUpdater<Index, Index>
rightUpdater = AtomicReferenceFieldUpdater.newUpdater
(Index.class, Index.class, "right");
/**
* compareAndSet right field
*/
final boolean casRight(Index<K,V> cmp, Index<K,V> val) {
return rightUpdater.compareAndSet(this, cmp, val);
return UNSAFE.compareAndSwapObject(this, rightOffset, cmp, val);
}
/**
......@@ -591,6 +578,12 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
final boolean unlink(Index<K,V> succ) {
return !indexesDeletedNode() && casRight(succ, succ.right);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
private static final long rightOffset =
objectFieldOffset(UNSAFE, "right", Index.class);
}
/* ---------------- Head nodes -------------- */
......@@ -640,7 +633,8 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
* cast key as Comparable, which may cause ClassCastException,
* which is propagated back to caller.
*/
private Comparable<? super K> comparable(Object key) throws ClassCastException {
private Comparable<? super K> comparable(Object key)
throws ClassCastException {
if (key == null)
throw new NullPointerException();
if (comparator != null)
......@@ -799,68 +793,12 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
}
/**
* Specialized variant of findNode to perform Map.get. Does a weak
* traversal, not bothering to fix any deleted index nodes,
* returning early if it happens to see key in index, and passing
* over any deleted base nodes, falling back to getUsingFindNode
* only if it would otherwise return value from an ongoing
* deletion. Also uses "bound" to eliminate need for some
* comparisons (see Pugh Cookbook). Also folds uses of null checks
* and node-skipping because markers have null keys.
* Gets value for key using findNode.
* @param okey the key
* @return the value, or null if absent
*/
private V doGet(Object okey) {
Comparable<? super K> key = comparable(okey);
Node<K,V> bound = null;
Index<K,V> q = head;
Index<K,V> r = q.right;
Node<K,V> n;
K k;
int c;
for (;;) {
Index<K,V> d;
// Traverse rights
if (r != null && (n = r.node) != bound && (k = n.key) != null) {
if ((c = key.compareTo(k)) > 0) {
q = r;
r = r.right;
continue;
} else if (c == 0) {
Object v = n.value;
return (v != null)? (V)v : getUsingFindNode(key);
} else
bound = n;
}
// Traverse down
if ((d = q.down) != null) {
q = d;
r = d.right;
} else
break;
}
// Traverse nexts
for (n = q.node.next; n != null; n = n.next) {
if ((k = n.key) != null) {
if ((c = key.compareTo(k)) == 0) {
Object v = n.value;
return (v != null)? (V)v : getUsingFindNode(key);
} else if (c < 0)
break;
}
}
return null;
}
/**
* Performs map.get via findNode. Used as a backup if doGet
* encounters an in-progress deletion.
* @param key the key
* @return the value, or null if absent
*/
private V getUsingFindNode(Comparable<? super K> key) {
/*
* Loop needed here and elsewhere in case value field goes
* null just as it is about to be returned, in which case we
......@@ -943,7 +881,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
x ^= x << 13;
x ^= x >>> 17;
randomSeed = x ^= x << 5;
if ((x & 0x8001) != 0) // test highest and lowest bits
if ((x & 0x80000001) != 0) // test highest and lowest bits
return 0;
int level = 1;
while (((x >>>= 1) & 1) != 0) ++level;
......@@ -1256,7 +1194,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
Node<K,V> n = b.next;
for (;;) {
if (n == null)
return (b.isBaseHeader())? null : b;
return b.isBaseHeader() ? null : b;
Node<K,V> f = n.next; // inconsistent read
if (n != b.next)
break;
......@@ -1374,7 +1312,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
Node<K,V> n = b.next;
for (;;) {
if (n == null)
return ((rel & LT) == 0 || b.isBaseHeader())? null : b;
return ((rel & LT) == 0 || b.isBaseHeader()) ? null : b;
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;
......@@ -1390,7 +1328,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
(c < 0 && (rel & LT) == 0))
return n;
if ( c <= 0 && (rel & LT) != 0)
return (b.isBaseHeader())? null : b;
return b.isBaseHeader() ? null : b;
b = n;
n = f;
}
......@@ -1744,7 +1682,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
if (n.getValidValue() != null)
++count;
}
return (count >= Integer.MAX_VALUE)? Integer.MAX_VALUE : (int)count;
return (count >= Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) count;
}
/**
......@@ -2099,7 +2037,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
*/
public K lowerKey(K key) {
Node<K,V> n = findNear(key, LT);
return (n == null)? null : n.key;
return (n == null) ? null : n.key;
}
/**
......@@ -2123,7 +2061,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
*/
public K floorKey(K key) {
Node<K,V> n = findNear(key, LT|EQ);
return (n == null)? null : n.key;
return (n == null) ? null : n.key;
}
/**
......@@ -2145,7 +2083,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
*/
public K ceilingKey(K key) {
Node<K,V> n = findNear(key, GT|EQ);
return (n == null)? null : n.key;
return (n == null) ? null : n.key;
}
/**
......@@ -2169,7 +2107,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
*/
public K higherKey(K key) {
Node<K,V> n = findNear(key, GT);
return (n == null)? null : n.key;
return (n == null) ? null : n.key;
}
/**
......@@ -2342,7 +2280,8 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
return list;
}
static final class KeySet<E> extends AbstractSet<E> implements NavigableSet<E> {
static final class KeySet<E>
extends AbstractSet<E> implements NavigableSet<E> {
private final ConcurrentNavigableMap<E,Object> m;
KeySet(ConcurrentNavigableMap<E,Object> map) { m = map; }
public int size() { return m.size(); }
......@@ -2359,11 +2298,11 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
public E last() { return m.lastKey(); }
public E pollFirst() {
Map.Entry<E,Object> e = m.pollFirstEntry();
return e == null? null : e.getKey();
return (e == null) ? null : e.getKey();
}
public E pollLast() {
Map.Entry<E,Object> e = m.pollLastEntry();
return e == null? null : e.getKey();
return (e == null) ? null : e.getKey();
}
public Iterator<E> iterator() {
if (m instanceof ConcurrentSkipListMap)
......@@ -2710,9 +2649,9 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
rel &= ~m.LT;
}
if (tooLow(key))
return ((rel & m.LT) != 0)? null : lowestEntry();
return ((rel & m.LT) != 0) ? null : lowestEntry();
if (tooHigh(key))
return ((rel & m.LT) != 0)? highestEntry() : null;
return ((rel & m.LT) != 0) ? highestEntry() : null;
for (;;) {
Node<K,V> n = m.findNear(key, rel);
if (n == null || !inBounds(n.key))
......@@ -2783,7 +2722,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
public V remove(Object key) {
K k = (K)key;
return (!inBounds(k))? null : m.remove(k);
return (!inBounds(k)) ? null : m.remove(k);
}
public int size() {
......@@ -2794,7 +2733,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
if (n.getValidValue() != null)
++count;
}
return count >= Integer.MAX_VALUE? Integer.MAX_VALUE : (int)count;
return count >= Integer.MAX_VALUE ? Integer.MAX_VALUE : (int)count;
}
public boolean isEmpty() {
......@@ -2972,27 +2911,27 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
}
public K firstKey() {
return isDescending? highestKey() : lowestKey();
return isDescending ? highestKey() : lowestKey();
}
public K lastKey() {
return isDescending? lowestKey() : highestKey();
return isDescending ? lowestKey() : highestKey();
}
public Map.Entry<K,V> firstEntry() {
return isDescending? highestEntry() : lowestEntry();
return isDescending ? highestEntry() : lowestEntry();
}
public Map.Entry<K,V> lastEntry() {
return isDescending? lowestEntry() : highestEntry();
return isDescending ? lowestEntry() : highestEntry();
}
public Map.Entry<K,V> pollFirstEntry() {
return isDescending? removeHighest() : removeLowest();
return isDescending ? removeHighest() : removeLowest();
}
public Map.Entry<K,V> pollLastEntry() {
return isDescending? removeLowest() : removeHighest();
return isDescending ? removeLowest() : removeHighest();
}
/* ---------------- Submap Views -------------- */
......@@ -3141,4 +3080,22 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
}
}
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
private static final long headOffset =
objectFieldOffset(UNSAFE, "head", ConcurrentSkipListMap.class);
static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
String field, Class<?> klazz) {
try {
return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
} catch (NoSuchFieldException e) {
// Convert Exception to corresponding Error
NoSuchFieldError error = new NoSuchFieldError(field);
error.initCause(e);
throw error;
}
}
}
......@@ -832,7 +832,7 @@ public class CopyOnWriteArrayList<E>
}
/**
* Save the state of the list to a stream (i.e., serialize it).
* Saves the state of the list to a stream (that is, serializes it).
*
* @serialData The length of the array backing the list is emitted
* (int), followed by all of its elements (each an Object)
......@@ -842,27 +842,25 @@ public class CopyOnWriteArrayList<E>
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException{
// Write out element count, and any hidden stuff
s.defaultWriteObject();
Object[] elements = getArray();
int len = elements.length;
// Write out array length
s.writeInt(len);
s.writeInt(elements.length);
// Write out all elements in the proper order.
for (int i = 0; i < len; i++)
s.writeObject(elements[i]);
for (Object element : elements)
s.writeObject(element);
}
/**
* Reconstitute the list from a stream (i.e., deserialize it).
* Reconstitutes the list from a stream (that is, deserializes it).
*
* @param s the stream
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
// Read in size, and any hidden stuff
s.defaultReadObject();
// bind to new lock
......
......@@ -525,8 +525,8 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
private volatile long eventWaiters;
private static final int EVENT_COUNT_SHIFT = 32;
private static final long WAITER_ID_MASK = (1L << 16) - 1L;
private static final int EVENT_COUNT_SHIFT = 32;
private static final int WAITER_ID_MASK = (1 << 16) - 1;
/**
* A counter for events that may wake up worker threads:
......@@ -615,7 +615,7 @@ public class ForkJoinPool extends AbstractExecutorService {
// are usually manually inlined by callers
/**
* Increments running count part of workerCounts
* Increments running count part of workerCounts.
*/
final void incrementRunningCount() {
int c;
......@@ -625,7 +625,17 @@ public class ForkJoinPool extends AbstractExecutorService {
}
/**
* Tries to decrement running count unless already zero
* Tries to increment running count part of workerCounts.
*/
final boolean tryIncrementRunningCount() {
int c;
return UNSAFE.compareAndSwapInt(this, workerCountsOffset,
c = workerCounts,
c + ONE_RUNNING);
}
/**
* Tries to decrement running count unless already zero.
*/
final boolean tryDecrementRunningCount() {
int wc = workerCounts;
......@@ -698,10 +708,11 @@ public class ForkJoinPool extends AbstractExecutorService {
for (k = 0; k < n && ws[k] != null; ++k)
;
if (k == n)
ws = Arrays.copyOf(ws, n << 1);
ws = workers = Arrays.copyOf(ws, n << 1);
}
ws[k] = w;
workers = ws; // volatile array write ensures slot visibility
int c = eventCount; // advance event count to ensure visibility
UNSAFE.compareAndSwapInt(this, eventCountOffset, c, c+1);
} finally {
lock.unlock();
}
......@@ -734,7 +745,7 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
final void workerTerminated(ForkJoinWorkerThread w) {
forgetWorker(w);
decrementWorkerCounts(w.isTrimmed()? 0 : ONE_RUNNING, ONE_TOTAL);
decrementWorkerCounts(w.isTrimmed() ? 0 : ONE_RUNNING, ONE_TOTAL);
while (w.stealCount != 0) // collect final count
tryAccumulateStealCount(w);
tryTerminate(false);
......@@ -746,24 +757,23 @@ public class ForkJoinPool extends AbstractExecutorService {
* Releases workers blocked on a count not equal to current count.
* Normally called after precheck that eventWaiters isn't zero to
* avoid wasted array checks. Gives up upon a change in count or
* upon releasing two workers, letting others take over.
* upon releasing four workers, letting others take over.
*/
private void releaseEventWaiters() {
ForkJoinWorkerThread[] ws = workers;
int n = ws.length;
long h = eventWaiters;
int ec = eventCount;
boolean releasedOne = false;
int releases = 4;
ForkJoinWorkerThread w; int id;
while ((id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 &&
while ((id = (((int)h) & WAITER_ID_MASK) - 1) >= 0 &&
(int)(h >>> EVENT_COUNT_SHIFT) != ec &&
id < n && (w = ws[id]) != null) {
if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
h, w.nextWaiter)) {
LockSupport.unpark(w);
if (releasedOne) // exit on second release
if (--releases == 0)
break;
releasedOne = true;
}
if (eventCount != ec)
break;
......@@ -793,7 +803,7 @@ public class ForkJoinPool extends AbstractExecutorService {
long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1));
long h;
while ((runState < SHUTDOWN || !tryTerminate(false)) &&
(((int)((h = eventWaiters) & WAITER_ID_MASK)) == 0 ||
(((int)(h = eventWaiters) & WAITER_ID_MASK) == 0 ||
(int)(h >>> EVENT_COUNT_SHIFT) == ec) &&
eventCount == ec) {
if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
......@@ -820,9 +830,9 @@ public class ForkJoinPool extends AbstractExecutorService {
if (tryAccumulateStealCount(w)) { // transfer while idle
boolean untimed = (w.nextWaiter != 0L ||
(workerCounts & RUNNING_COUNT_MASK) <= 1);
long startTime = untimed? 0 : System.nanoTime();
long startTime = untimed ? 0 : System.nanoTime();
Thread.interrupted(); // clear/ignore interrupt
if (eventCount != ec || w.isTerminating())
if (w.isTerminating() || eventCount != ec)
break; // recheck after clear
if (untimed)
LockSupport.park(w);
......@@ -860,7 +870,8 @@ public class ForkJoinPool extends AbstractExecutorService {
if ((sw = spareWaiters) != 0 &&
(id = (sw & SPARE_ID_MASK) - 1) >= 0 &&
id < n && (w = ws[id]) != null &&
(workerCounts & RUNNING_COUNT_MASK) < parallelism &&
(runState >= TERMINATING ||
(workerCounts & RUNNING_COUNT_MASK) < parallelism) &&
spareWaiters == sw &&
UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
sw, w.nextSpare)) {
......@@ -914,12 +925,8 @@ public class ForkJoinPool extends AbstractExecutorService {
break;
}
w.start(recordWorker(w), ueh);
if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) {
int c; // advance event count
UNSAFE.compareAndSwapInt(this, eventCountOffset,
c = eventCount, c+1);
if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc)
break; // add at most one unless total below target
}
}
}
if (eventWaiters != 0L)
......@@ -955,7 +962,7 @@ public class ForkJoinPool extends AbstractExecutorService {
}
else if ((h = eventWaiters) != 0L) {
long nh;
int id = ((int)(h & WAITER_ID_MASK)) - 1;
int id = (((int)h) & WAITER_ID_MASK) - 1;
if (id >= 0 && id < n && (w = ws[id]) != null &&
(nh = w.nextWaiter) != 0L && // keep at least one worker
UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh))
......@@ -1003,24 +1010,31 @@ public class ForkJoinPool extends AbstractExecutorService {
int pc = parallelism;
while (w.runState == 0) {
int rs = runState;
if (rs >= TERMINATING) { // propagate shutdown
if (rs >= TERMINATING) { // propagate shutdown
w.shutdown();
break;
}
if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) &&
UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1))
UNSAFE.compareAndSwapInt(this, runStateOffset, rs, --rs)) {
inactivate = active = w.active = false;
int wc = workerCounts;
if (rs == SHUTDOWN) { // all inactive and shut down
tryTerminate(false);
continue;
}
}
int wc = workerCounts; // try to suspend as spare
if ((wc & RUNNING_COUNT_MASK) > pc) {
if (!(inactivate |= active) && // must inactivate to suspend
workerCounts == wc && // try to suspend as spare
workerCounts == wc &&
UNSAFE.compareAndSwapInt(this, workerCountsOffset,
wc, wc - ONE_RUNNING))
w.suspendAsSpare();
}
else if ((wc >>> TOTAL_COUNT_SHIFT) < pc)
helpMaintainParallelism(); // not enough workers
else if (!ran) {
else if (ran)
break;
else {
long h = eventWaiters;
int ec = eventCount;
if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != ec)
......@@ -1032,8 +1046,6 @@ public class ForkJoinPool extends AbstractExecutorService {
else if (!(inactivate |= active))
eventSync(w, wec); // must inactivate before sync
}
else
break;
}
}
......@@ -1043,35 +1055,67 @@ public class ForkJoinPool extends AbstractExecutorService {
*
* @param joinMe the task to join
* @param worker the current worker thread
* @param timed true if wait should time out
* @param nanos timeout value if timed
*/
final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker) {
final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker,
boolean timed, long nanos) {
long startTime = timed ? System.nanoTime() : 0L;
int retries = 2 + (parallelism >> 2); // #helpJoins before blocking
boolean running = true; // false when count decremented
while (joinMe.status >= 0) {
int wc;
worker.helpJoinTask(joinMe);
if (runState >= TERMINATING) {
joinMe.cancelIgnoringExceptions();
break;
}
running = worker.helpJoinTask(joinMe, running);
if (joinMe.status < 0)
break;
else if (retries > 0)
if (retries > 0) {
--retries;
else if (((wc = workerCounts) & RUNNING_COUNT_MASK) != 0 &&
UNSAFE.compareAndSwapInt(this, workerCountsOffset,
wc, wc - ONE_RUNNING)) {
int stat, c; long h;
while ((stat = joinMe.status) >= 0 &&
(h = eventWaiters) != 0L && // help release others
(int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
continue;
}
int wc = workerCounts;
if ((wc & RUNNING_COUNT_MASK) != 0) {
if (running) {
if (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
wc, wc - ONE_RUNNING))
continue;
running = false;
}
long h = eventWaiters;
if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
releaseEventWaiters();
if (stat >= 0 &&
((workerCounts & RUNNING_COUNT_MASK) == 0 ||
(stat =
joinMe.internalAwaitDone(JOIN_TIMEOUT_MILLIS)) >= 0))
helpMaintainParallelism(); // timeout or no running workers
do {} while (!UNSAFE.compareAndSwapInt
(this, workerCountsOffset,
c = workerCounts, c + ONE_RUNNING));
if (stat < 0)
break; // else restart
if ((workerCounts & RUNNING_COUNT_MASK) != 0) {
long ms; int ns;
if (!timed) {
ms = JOIN_TIMEOUT_MILLIS;
ns = 0;
}
else { // at most JOIN_TIMEOUT_MILLIS per wait
long nt = nanos - (System.nanoTime() - startTime);
if (nt <= 0L)
break;
ms = nt / 1000000;
if (ms > JOIN_TIMEOUT_MILLIS) {
ms = JOIN_TIMEOUT_MILLIS;
ns = 0;
}
else
ns = (int) (nt % 1000000);
}
joinMe.internalAwaitDone(ms, ns);
}
if (joinMe.status < 0)
break;
}
helpMaintainParallelism();
}
if (!running) {
int c;
do {} while (!UNSAFE.compareAndSwapInt
(this, workerCountsOffset,
c = workerCounts, c + ONE_RUNNING));
}
}
......@@ -1082,9 +1126,10 @@ public class ForkJoinPool extends AbstractExecutorService {
throws InterruptedException {
while (!blocker.isReleasable()) {
int wc = workerCounts;
if ((wc & RUNNING_COUNT_MASK) != 0 &&
UNSAFE.compareAndSwapInt(this, workerCountsOffset,
wc, wc - ONE_RUNNING)) {
if ((wc & RUNNING_COUNT_MASK) == 0)
helpMaintainParallelism();
else if (UNSAFE.compareAndSwapInt(this, workerCountsOffset,
wc, wc - ONE_RUNNING)) {
try {
while (!blocker.isReleasable()) {
long h = eventWaiters;
......@@ -1129,12 +1174,11 @@ public class ForkJoinPool extends AbstractExecutorService {
// Finish now if all threads terminated; else in some subsequent call
if ((workerCounts >>> TOTAL_COUNT_SHIFT) == 0) {
advanceRunLevel(TERMINATED);
termination.arrive();
termination.forceTermination();
}
return true;
}
/**
* Actions on transition to TERMINATING
*
......@@ -1325,17 +1369,13 @@ public class ForkJoinPool extends AbstractExecutorService {
// Execution methods
/**
* Common code for execute, invoke and submit
* Submits task and creates, starts, or resumes some workers if necessary
*/
private <T> void doSubmit(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
if (runState >= SHUTDOWN)
throw new RejectedExecutionException();
submissionQueue.offer(task);
int c; // try to increment event count -- CAS failure OK
UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
helpMaintainParallelism(); // create, start, or resume some workers
helpMaintainParallelism();
}
/**
......@@ -1348,8 +1388,33 @@ public class ForkJoinPool extends AbstractExecutorService {
* scheduled for execution
*/
public <T> T invoke(ForkJoinTask<T> task) {
doSubmit(task);
return task.join();
if (task == null)
throw new NullPointerException();
if (runState >= SHUTDOWN)
throw new RejectedExecutionException();
Thread t = Thread.currentThread();
if ((t instanceof ForkJoinWorkerThread) &&
((ForkJoinWorkerThread)t).pool == this)
return task.invoke(); // bypass submit if in same pool
else {
doSubmit(task);
return task.join();
}
}
/**
* Unless terminating, forks task if within an ongoing FJ
* computation in the current pool, else submits as external task.
*/
private <T> void forkOrSubmit(ForkJoinTask<T> task) {
if (runState >= SHUTDOWN)
throw new RejectedExecutionException();
Thread t = Thread.currentThread();
if ((t instanceof ForkJoinWorkerThread) &&
((ForkJoinWorkerThread)t).pool == this)
task.fork();
else
doSubmit(task);
}
/**
......@@ -1361,7 +1426,9 @@ public class ForkJoinPool extends AbstractExecutorService {
* scheduled for execution
*/
public void execute(ForkJoinTask<?> task) {
doSubmit(task);
if (task == null)
throw new NullPointerException();
forkOrSubmit(task);
}
// AbstractExecutorService methods
......@@ -1372,12 +1439,14 @@ public class ForkJoinPool extends AbstractExecutorService {
* scheduled for execution
*/
public void execute(Runnable task) {
if (task == null)
throw new NullPointerException();
ForkJoinTask<?> job;
if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task;
else
job = ForkJoinTask.adapt(task, null);
doSubmit(job);
forkOrSubmit(job);
}
/**
......@@ -1390,7 +1459,9 @@ public class ForkJoinPool extends AbstractExecutorService {
* scheduled for execution
*/
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
doSubmit(task);
if (task == null)
throw new NullPointerException();
forkOrSubmit(task);
return task;
}
......@@ -1400,8 +1471,10 @@ public class ForkJoinPool extends AbstractExecutorService {
* scheduled for execution
*/
public <T> ForkJoinTask<T> submit(Callable<T> task) {
if (task == null)
throw new NullPointerException();
ForkJoinTask<T> job = ForkJoinTask.adapt(task);
doSubmit(job);
forkOrSubmit(job);
return job;
}
......@@ -1411,8 +1484,10 @@ public class ForkJoinPool extends AbstractExecutorService {
* scheduled for execution
*/
public <T> ForkJoinTask<T> submit(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
doSubmit(job);
forkOrSubmit(job);
return job;
}
......@@ -1422,12 +1497,14 @@ public class ForkJoinPool extends AbstractExecutorService {
* scheduled for execution
*/
public ForkJoinTask<?> submit(Runnable task) {
if (task == null)
throw new NullPointerException();
ForkJoinTask<?> job;
if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task;
else
job = ForkJoinTask.adapt(task, null);
doSubmit(job);
forkOrSubmit(job);
return job;
}
......@@ -1725,8 +1802,11 @@ public class ForkJoinPool extends AbstractExecutorService {
* commenced but not yet completed. This method may be useful for
* debugging. A return of {@code true} reported a sufficient
* period after shutdown may indicate that submitted tasks have
* ignored or suppressed interruption, causing this executor not
* to properly terminate.
* ignored or suppressed interruption, or are waiting for IO,
* causing this executor not to properly terminate. (See the
* advisory notes for class {@link ForkJoinTask} stating that
* tasks should not normally entail blocking operations. But if
* they do, they must abort them on interrupt.)
*
* @return {@code true} if terminating but not yet terminated
*/
......@@ -1764,10 +1844,11 @@ public class ForkJoinPool extends AbstractExecutorService {
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
try {
return termination.awaitAdvanceInterruptibly(0, timeout, unit) > 0;
termination.awaitAdvanceInterruptibly(0, timeout, unit);
} catch (TimeoutException ex) {
return false;
}
return true;
}
/**
......
......@@ -38,16 +38,18 @@ package java.util.concurrent;
import java.util.Random;
import java.util.Collection;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.RejectedExecutionException;
/**
* A thread managed by a {@link ForkJoinPool}. This class is
* subclassable solely for the sake of adding functionality -- there
* are no overridable methods dealing with scheduling or execution.
* However, you can override initialization and termination methods
* surrounding the main task processing loop. If you do create such a
* subclass, you will also need to supply a custom {@link
* ForkJoinPool.ForkJoinWorkerThreadFactory} to use it in a {@code
* ForkJoinPool}.
* A thread managed by a {@link ForkJoinPool}, which executes
* {@link ForkJoinTask}s.
* This class is subclassable solely for the sake of adding
* functionality -- there are no overridable methods dealing with
* scheduling or execution. However, you can override initialization
* and termination methods surrounding the main task processing loop.
* If you do create such a subclass, you will also need to supply a
* custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
* in a {@code ForkJoinPool}.
*
* @since 1.7
* @author Doug Lea
......@@ -376,7 +378,7 @@ public class ForkJoinWorkerThread extends Thread {
/**
* Initializes internal state after construction but before
* processing any tasks. If you override this method, you must
* invoke @code{super.onStart()} at the beginning of the method.
* invoke {@code super.onStart()} at the beginning of the method.
* Initialization requires care: Most fields must have legal
* default values, to ensure that attempted accesses from other
* threads work correctly even before this thread starts
......@@ -384,7 +386,7 @@ public class ForkJoinWorkerThread extends Thread {
*/
protected void onStart() {
int rs = seedGenerator.nextInt();
seed = rs == 0? 1 : rs; // seed must be nonzero
seed = (rs == 0) ? 1 : rs; // seed must be nonzero
// Allocate name string and arrays in this thread
String pid = Integer.toString(pool.getPoolNumber());
......@@ -426,7 +428,7 @@ public class ForkJoinWorkerThread extends Thread {
/**
* This method is required to be public, but should never be
* called explicitly. It performs the main run loop to execute
* ForkJoinTasks.
* {@link ForkJoinTask}s.
*/
public void run() {
Throwable exception = null;
......@@ -628,6 +630,19 @@ public class ForkJoinWorkerThread extends Thread {
if (t == null) // lost to stealer
break;
if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
/*
* Note: here and in related methods, as a
* performance (not correctness) issue, we'd like
* to encourage compiler not to arbitrarily
* postpone setting sp after successful CAS.
* Currently there is no intrinsic for arranging
* this, but using Unsafe putOrderedInt may be a
* preferable strategy on some compilers even
* though its main effect is a pre-, not post-
* fence. To simplify possible changes, the option
* is left in comments next to the associated
* assignments.
*/
sp = s; // putOrderedInt may encourage more timely write
// UNSAFE.putOrderedInt(this, spOffset, s);
return t;
......@@ -777,10 +792,10 @@ public class ForkJoinWorkerThread extends Thread {
// Run State management
// status check methods used mainly by ForkJoinPool
final boolean isRunning() { return runState == 0; }
final boolean isTerminated() { return (runState & TERMINATED) != 0; }
final boolean isSuspended() { return (runState & SUSPENDED) != 0; }
final boolean isTrimmed() { return (runState & TRIMMED) != 0; }
final boolean isRunning() { return runState == 0; }
final boolean isTerminated() { return (runState & TERMINATED) != 0; }
final boolean isSuspended() { return (runState & SUSPENDED) != 0; }
final boolean isTrimmed() { return (runState & TRIMMED) != 0; }
final boolean isTerminating() {
if ((runState & TERMINATING) != 0)
......@@ -884,8 +899,7 @@ public class ForkJoinWorkerThread extends Thread {
*/
final void cancelTasks() {
ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
if (cj != null) {
currentJoin = null;
if (cj != null && cj.status >= 0) {
cj.cancelIgnoringExceptions();
try {
this.interrupt(); // awaken wait
......@@ -893,10 +907,8 @@ public class ForkJoinWorkerThread extends Thread {
}
}
ForkJoinTask<?> cs = currentSteal;
if (cs != null) {
currentSteal = null;
if (cs != null && cs.status >= 0)
cs.cancelIgnoringExceptions();
}
while (base != sp) {
ForkJoinTask<?> t = deqTask();
if (t != null)
......@@ -959,57 +971,23 @@ public class ForkJoinWorkerThread extends Thread {
* Possibly runs some tasks and/or blocks, until task is done.
*
* @param joinMe the task to join
* @param timed true if use timed wait
* @param nanos wait time if timed
*/
final void joinTask(ForkJoinTask<?> joinMe) {
final void joinTask(ForkJoinTask<?> joinMe, boolean timed, long nanos) {
// currentJoin only written by this thread; only need ordered store
ForkJoinTask<?> prevJoin = currentJoin;
UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
if (sp != base)
localHelpJoinTask(joinMe);
if (joinMe.status >= 0)
pool.awaitJoin(joinMe, this);
pool.awaitJoin(joinMe, this, timed, nanos);
UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
}
/**
* Run tasks in local queue until given task is done.
*
* @param joinMe the task to join
*/
private void localHelpJoinTask(ForkJoinTask<?> joinMe) {
int s;
ForkJoinTask<?>[] q;
while (joinMe.status >= 0 && (s = sp) != base && (q = queue) != null) {
int i = (q.length - 1) & --s;
long u = (i << qShift) + qBase; // raw offset
ForkJoinTask<?> t = q[i];
if (t == null) // lost to a stealer
break;
if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
/*
* This recheck (and similarly in helpJoinTask)
* handles cases where joinMe is independently
* cancelled or forced even though there is other work
* available. Back out of the pop by putting t back
* into slot before we commit by writing sp.
*/
if (joinMe.status < 0) {
UNSAFE.putObjectVolatile(q, u, t);
break;
}
sp = s;
// UNSAFE.putOrderedInt(this, spOffset, s);
t.quietlyExec();
}
}
}
/**
* Unless terminating, tries to locate and help perform tasks for
* a stealer of the given task, or in turn one of its stealers.
* Traces currentSteal->currentJoin links looking for a thread
* working on a descendant of the given task and with a non-empty
* queue to steal back and execute tasks from.
* Tries to locate and help perform tasks for a stealer of the
* given task, or in turn one of its stealers. Traces
* currentSteal->currentJoin links looking for a thread working on
* a descendant of the given task and with a non-empty queue to
* steal back and execute tasks from.
*
* The implementation is very branchy to cope with potential
* inconsistencies or loops encountering chains that are stale,
......@@ -1019,77 +997,127 @@ public class ForkJoinWorkerThread extends Thread {
* don't work out.
*
* @param joinMe the task to join
* @param running if false, then must update pool count upon
* running a task
* @return value of running on exit
*/
final void helpJoinTask(ForkJoinTask<?> joinMe) {
ForkJoinWorkerThread[] ws;
int n;
if (joinMe.status < 0) // already done
return;
if ((runState & TERMINATING) != 0) { // cancel if shutting down
joinMe.cancelIgnoringExceptions();
return;
final boolean helpJoinTask(ForkJoinTask<?> joinMe, boolean running) {
/*
* Initial checks to (1) abort if terminating; (2) clean out
* old cancelled tasks from local queue; (3) if joinMe is next
* task, run it; (4) omit scan if local queue nonempty (since
* it may contain non-descendents of joinMe).
*/
ForkJoinPool p = pool;
for (;;) {
ForkJoinTask<?>[] q;
int s;
if (joinMe.status < 0)
return running;
else if ((runState & TERMINATING) != 0) {
joinMe.cancelIgnoringExceptions();
return running;
}
else if ((s = sp) == base || (q = queue) == null)
break; // queue empty
else {
int i = (q.length - 1) & --s;
long u = (i << qShift) + qBase; // raw offset
ForkJoinTask<?> t = q[i];
if (t == null)
break; // lost to a stealer
else if (t != joinMe && t.status >= 0)
return running; // cannot safely help
else if ((running ||
(running = p.tryIncrementRunningCount())) &&
UNSAFE.compareAndSwapObject(q, u, t, null)) {
sp = s; // putOrderedInt may encourage more timely write
// UNSAFE.putOrderedInt(this, spOffset, s);
t.quietlyExec();
}
}
}
if ((ws = pool.workers) == null || (n = ws.length) <= 1)
return; // need at least 2 workers
ForkJoinTask<?> task = joinMe; // base of chain
ForkJoinWorkerThread thread = this; // thread with stolen task
for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
// Try to find v, the stealer of task, by first using hint
ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
if (v == null || v.currentSteal != task) {
for (int j = 0; ; ++j) { // search array
if (j < n) {
ForkJoinTask<?> vs;
if ((v = ws[j]) != null &&
(vs = v.currentSteal) != null) {
if (joinMe.status < 0 || task.status < 0)
return; // stale or done
if (vs == task) {
thread.stealHint = j;
break; // save hint for next time
int n; // worker array size
ForkJoinWorkerThread[] ws = p.workers;
if (ws != null && (n = ws.length) > 1) { // need at least 2 workers
ForkJoinTask<?> task = joinMe; // base of chain
ForkJoinWorkerThread thread = this; // thread with stolen task
outer:for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
// Try to find v, the stealer of task, by first using hint
ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
if (v == null || v.currentSteal != task) {
for (int j = 0; ; ++j) { // search array
if (j < n) {
ForkJoinTask<?> vs;
if ((v = ws[j]) != null &&
(vs = v.currentSteal) != null) {
if (joinMe.status < 0)
break outer;
if (vs == task) {
if (task.status < 0)
break outer; // stale
thread.stealHint = j;
break; // save hint for next time
}
}
}
else
break outer; // no stealer
}
else
return; // no stealer
}
}
for (;;) { // Try to help v, using specialized form of deqTask
if (joinMe.status < 0)
return;
int b = v.base;
ForkJoinTask<?>[] q = v.queue;
if (b == v.sp || q == null)
break;
int i = (q.length - 1) & b;
long u = (i << qShift) + qBase;
ForkJoinTask<?> t = q[i];
int pid = poolIndex;
ForkJoinTask<?> ps = currentSteal;
if (task.status < 0)
return; // stale or done
if (t != null && v.base == b++ &&
UNSAFE.compareAndSwapObject(q, u, t, null)) {
if (joinMe.status < 0) {
UNSAFE.putObjectVolatile(q, u, t);
return; // back out on cancel
// Try to help v, using specialized form of deqTask
for (;;) {
if (joinMe.status < 0)
break outer;
int b = v.base;
ForkJoinTask<?>[] q = v.queue;
if (b == v.sp || q == null)
break; // empty
int i = (q.length - 1) & b;
long u = (i << qShift) + qBase;
ForkJoinTask<?> t = q[i];
if (task.status < 0)
break outer; // stale
if (t != null &&
(running ||
(running = p.tryIncrementRunningCount())) &&
v.base == b++ &&
UNSAFE.compareAndSwapObject(q, u, t, null)) {
if (t != joinMe && joinMe.status < 0) {
UNSAFE.putObjectVolatile(q, u, t);
break outer; // joinMe cancelled; back out
}
v.base = b;
if (t.status >= 0) {
ForkJoinTask<?> ps = currentSteal;
int pid = poolIndex;
v.stealHint = pid;
UNSAFE.putOrderedObject(this,
currentStealOffset, t);
t.quietlyExec();
UNSAFE.putOrderedObject(this,
currentStealOffset, ps);
}
}
else if ((runState & TERMINATING) != 0) {
joinMe.cancelIgnoringExceptions();
break outer;
}
v.base = b;
v.stealHint = pid;
UNSAFE.putOrderedObject(this, currentStealOffset, t);
t.quietlyExec();
UNSAFE.putOrderedObject(this, currentStealOffset, ps);
}
// Try to descend to find v's stealer
ForkJoinTask<?> next = v.currentJoin;
if (task.status < 0 || next == null || next == task ||
joinMe.status < 0)
break; // done, stale, dead-end, or cyclic
task = next;
thread = v;
}
// Try to descend to find v's stealer
ForkJoinTask<?> next = v.currentJoin;
if (task.status < 0 || next == null || next == task ||
joinMe.status < 0)
return;
task = next;
thread = v;
}
return running;
}
/**
......
......@@ -1029,6 +1029,8 @@ public class LinkedBlockingDeque<E>
* elements as they existed upon construction of the iterator, and
* may (but is not guaranteed to) reflect any modifications
* subsequent to construction.
*
* @return an iterator over the elements in this deque in reverse order
*/
public Iterator<E> descendingIterator() {
return new DescendingItr();
......
......@@ -189,14 +189,14 @@ public class LinkedBlockingQueue<E> extends AbstractQueue<E>
}
/**
* Creates a node and links it at end of queue.
* Links node at end of queue.
*
* @param x the item
* @param node the node
*/
private void enqueue(E x) {
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = new Node<E>(x);
last = last.next = node;
}
/**
......@@ -282,7 +282,7 @@ public class LinkedBlockingQueue<E> extends AbstractQueue<E>
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(e);
enqueue(new Node<E>(e));
++n;
}
count.set(n);
......@@ -332,6 +332,7 @@ public class LinkedBlockingQueue<E> extends AbstractQueue<E>
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
......@@ -347,7 +348,7 @@ public class LinkedBlockingQueue<E> extends AbstractQueue<E>
while (count.get() == capacity) {
notFull.await();
}
enqueue(e);
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
......@@ -382,7 +383,7 @@ public class LinkedBlockingQueue<E> extends AbstractQueue<E>
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
......@@ -411,11 +412,12 @@ public class LinkedBlockingQueue<E> extends AbstractQueue<E>
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(e);
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
......@@ -559,6 +561,27 @@ public class LinkedBlockingQueue<E> extends AbstractQueue<E>
}
}
/**
* Returns {@code true} if this queue contains the specified element.
* More formally, returns {@code true} if and only if this queue contains
* at least one element {@code e} such that {@code o.equals(e)}.
*
* @param o object to be checked for containment in this queue
* @return {@code true} if this queue contains the specified element
*/
public boolean contains(Object o) {
if (o == null) return false;
fullyLock();
try {
for (Node<E> p = head.next; p != null; p = p.next)
if (o.equals(p.item))
return true;
return false;
} finally {
fullyUnlock();
}
}
/**
* Returns an array containing all of the elements in this queue, in
* proper sequence.
......@@ -645,7 +668,20 @@ public class LinkedBlockingQueue<E> extends AbstractQueue<E>
public String toString() {
fullyLock();
try {
return super.toString();
Node<E> p = head.next;
if (p == null)
return "[]";
StringBuilder sb = new StringBuilder();
sb.append('[');
for (;;) {
E e = p.item;
sb.append(e == this ? "(this Collection)" : e);
p = p.next;
if (p == null)
return sb.append(']').toString();
sb.append(',').append(' ');
}
} finally {
fullyUnlock();
}
......@@ -727,12 +763,14 @@ public class LinkedBlockingQueue<E> extends AbstractQueue<E>
/**
* Returns an iterator over the elements in this queue in proper sequence.
* The returned {@code Iterator} is a "weakly consistent" iterator that
* The elements will be returned in order from first (head) to last (tail).
*
* <p>The returned iterator is a "weakly consistent" iterator that
* will never throw {@link java.util.ConcurrentModificationException
* ConcurrentModificationException},
* and guarantees to traverse elements as they existed upon
* construction of the iterator, and may (but is not guaranteed to)
* reflect any modifications subsequent to construction.
* ConcurrentModificationException}, and guarantees to traverse
* elements as they existed upon construction of the iterator, and
* may (but is not guaranteed to) reflect any modifications
* subsequent to construction.
*
* @return an iterator over the elements in this queue in proper sequence
*/
......
......@@ -37,10 +37,10 @@ package java.util.concurrent;
import java.util.AbstractQueue;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
/**
......@@ -450,7 +450,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
}
final boolean casItem(Object cmp, Object val) {
// assert cmp == null || cmp.getClass() != Node.class;
// assert cmp == null || cmp.getClass() != Node.class;
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
......@@ -516,7 +516,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
* Tries to artificially match a data node -- used by remove.
*/
final boolean tryMatchData() {
// assert isData;
// assert isData;
Object x = item;
if (x != null && x != this && casItem(x, null)) {
LockSupport.unpark(waiter);
......@@ -569,7 +569,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
@SuppressWarnings("unchecked")
static <E> E cast(Object item) {
// assert item == null || item.getClass() != Node.class;
// assert item == null || item.getClass() != Node.class;
return (E) item;
}
......@@ -588,7 +588,8 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
throw new NullPointerException();
Node s = null; // the node to append, if needed
retry: for (;;) { // restart on append race
retry:
for (;;) { // restart on append race
for (Node h = head, p = h; p != null;) { // find & match first node
boolean isData = p.isData;
......@@ -599,7 +600,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
if (p.casItem(item, e)) { // match
for (Node q = p; q != h;) {
Node n = q.next; // update by 2 unless singleton
if (head == h && casHead(h, n == null? q : n)) {
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
......@@ -684,7 +685,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
for (;;) {
Object item = s.item;
if (item != e) { // matched
// assert item != s;
// assert item != s;
s.forgetContents(); // avoid garbage
return this.<E>cast(item);
}
......@@ -809,22 +810,61 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
* Moves to next node after prev, or first node if prev null.
*/
private void advance(Node prev) {
lastPred = lastRet;
lastRet = prev;
for (Node p = (prev == null) ? head : succ(prev);
p != null; p = succ(p)) {
Object item = p.item;
if (p.isData) {
if (item != null && item != p) {
nextItem = LinkedTransferQueue.this.<E>cast(item);
nextNode = p;
/*
* To track and avoid buildup of deleted nodes in the face
* of calls to both Queue.remove and Itr.remove, we must
* include variants of unsplice and sweep upon each
* advance: Upon Itr.remove, we may need to catch up links
* from lastPred, and upon other removes, we might need to
* skip ahead from stale nodes and unsplice deleted ones
* found while advancing.
*/
Node r, b; // reset lastPred upon possible deletion of lastRet
if ((r = lastRet) != null && !r.isMatched())
lastPred = r; // next lastPred is old lastRet
else if ((b = lastPred) == null || b.isMatched())
lastPred = null; // at start of list
else {
Node s, n; // help with removal of lastPred.next
while ((s = b.next) != null &&
s != b && s.isMatched() &&
(n = s.next) != null && n != s)
b.casNext(s, n);
}
this.lastRet = prev;
for (Node p = prev, s, n;;) {
s = (p == null) ? head : p.next;
if (s == null)
break;
else if (s == p) {
p = null;
continue;
}
Object item = s.item;
if (s.isData) {
if (item != null && item != s) {
nextItem = LinkedTransferQueue.<E>cast(item);
nextNode = s;
return;
}
}
else if (item == null)
break;
// assert s.isMatched();
if (p == null)
p = s;
else if ((n = s.next) == null)
break;
else if (s == n)
p = null;
else
p.casNext(s, n);
}
nextNode = null;
nextItem = null;
}
Itr() {
......@@ -844,10 +884,12 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
}
public final void remove() {
Node p = lastRet;
if (p == null) throw new IllegalStateException();
if (p.tryMatchData())
unsplice(lastPred, p);
final Node lastRet = this.lastRet;
if (lastRet == null)
throw new IllegalStateException();
this.lastRet = null;
if (lastRet.tryMatchData())
unsplice(lastPred, lastRet);
}
}
......@@ -997,8 +1039,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
* Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never return {@code false}.
*
* @return {@code true} (as specified by
* {@link BlockingQueue#offer(Object) BlockingQueue.offer})
* @return {@code true} (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
......@@ -1130,15 +1171,15 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
}
/**
* Returns an iterator over the elements in this queue in proper
* sequence, from head to tail.
* Returns an iterator over the elements in this queue in proper sequence.
* The elements will be returned in order from first (head) to last (tail).
*
* <p>The returned iterator is a "weakly consistent" iterator that
* will never throw
* {@link ConcurrentModificationException ConcurrentModificationException},
* and guarantees to traverse elements as they existed upon
* construction of the iterator, and may (but is not guaranteed
* to) reflect any modifications subsequent to construction.
* will never throw {@link java.util.ConcurrentModificationException
* ConcurrentModificationException}, and guarantees to traverse
* elements as they existed upon construction of the iterator, and
* may (but is not guaranteed to) reflect any modifications
* subsequent to construction.
*
* @return an iterator over the elements in this queue in proper sequence
*/
......@@ -1202,6 +1243,28 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
return findAndRemove(o);
}
/**
* Returns {@code true} if this queue contains the specified element.
* More formally, returns {@code true} if and only if this queue contains
* at least one element {@code e} such that {@code o.equals(e)}.
*
* @param o object to be checked for containment in this queue
* @return {@code true} if this queue contains the specified element
*/
public boolean contains(Object o) {
if (o == null) return false;
for (Node p = head; p != null; p = succ(p)) {
Object item = p.item;
if (p.isData) {
if (item != null && item != p && o.equals(item))
return true;
}
else if (item == null)
break;
}
return false;
}
/**
* Always returns {@code Integer.MAX_VALUE} because a
* {@code LinkedTransferQueue} is not capacity constrained.
......
......@@ -360,8 +360,12 @@ public class ScheduledThreadPoolExecutor
getExecuteExistingDelayedTasksAfterShutdownPolicy();
boolean keepPeriodic =
getContinueExistingPeriodicTasksAfterShutdownPolicy();
if (!keepDelayed && !keepPeriodic)
if (!keepDelayed && !keepPeriodic) {
for (Object e : q.toArray())
if (e instanceof RunnableScheduledFuture<?>)
((RunnableScheduledFuture<?>) e).cancel(false);
q.clear();
}
else {
// Traverse snapshot to avoid iterator exceptions
for (Object e : q.toArray()) {
......
......@@ -163,7 +163,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
/**
* Shared internal API for dual stacks and queues.
*/
static abstract class Transferer {
abstract static class Transferer {
/**
* Performs a put or take.
*
......@@ -190,7 +190,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
* seems not to vary with number of CPUs (beyond 2) so is just
* a constant.
*/
static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
/**
* The number of times to spin before blocking in untimed waits.
......@@ -241,19 +241,11 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
this.item = item;
}
static final AtomicReferenceFieldUpdater<SNode, SNode>
nextUpdater = AtomicReferenceFieldUpdater.newUpdater
(SNode.class, SNode.class, "next");
boolean casNext(SNode cmp, SNode val) {
return (cmp == next &&
nextUpdater.compareAndSet(this, cmp, val));
return cmp == next &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
static final AtomicReferenceFieldUpdater<SNode, SNode>
matchUpdater = AtomicReferenceFieldUpdater.newUpdater
(SNode.class, SNode.class, "match");
/**
* Tries to match node s to this node, if so, waking up thread.
* Fulfillers call tryMatch to identify their waiters.
......@@ -264,7 +256,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
*/
boolean tryMatch(SNode s) {
if (match == null &&
matchUpdater.compareAndSet(this, null, s)) {
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
......@@ -279,23 +271,28 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
* Tries to cancel a wait by matching node to itself.
*/
void tryCancel() {
matchUpdater.compareAndSet(this, null, this);
UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
boolean isCancelled() {
return match == this;
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
private static final long nextOffset =
objectFieldOffset(UNSAFE, "next", SNode.class);
private static final long matchOffset =
objectFieldOffset(UNSAFE, "match", SNode.class);
}
/** The head (top) of the stack */
volatile SNode head;
static final AtomicReferenceFieldUpdater<TransferStack, SNode>
headUpdater = AtomicReferenceFieldUpdater.newUpdater
(TransferStack.class, SNode.class, "head");
boolean casHead(SNode h, SNode nh) {
return h == head && headUpdater.compareAndSet(this, h, nh);
return h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
}
/**
......@@ -338,7 +335,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
*/
SNode s = null; // constructed/reused as needed
int mode = (e == null)? REQUEST : DATA;
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
......@@ -356,7 +353,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
}
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
return mode == REQUEST? m.item : s.item;
return (mode == REQUEST) ? m.item : s.item;
}
} else if (!isFulfilling(h.mode)) { // try to fulfill
if (h.isCancelled()) // already cancelled
......@@ -372,7 +369,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
SNode mn = m.next;
if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m
return (mode == REQUEST)? m.item : s.item;
return (mode == REQUEST) ? m.item : s.item;
} else // lost match
s.casNext(m, mn); // help unlink
}
......@@ -423,11 +420,11 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
* and don't wait at all, so are trapped in transfer
* method rather than calling awaitFulfill.
*/
long lastTime = (timed)? System.nanoTime() : 0;
long lastTime = timed ? System.nanoTime() : 0;
Thread w = Thread.currentThread();
SNode h = head;
int spins = (shouldSpin(s)?
(timed? maxTimedSpins : maxUntimedSpins) : 0);
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel();
......@@ -444,7 +441,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
}
}
if (spins > 0)
spins = shouldSpin(s)? (spins-1) : 0;
spins = shouldSpin(s) ? (spins-1) : 0;
else if (s.waiter == null)
s.waiter = w; // establish waiter so can park next iter
else if (!timed)
......@@ -499,6 +496,12 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
p = n;
}
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
private static final long headOffset =
objectFieldOffset(UNSAFE, "head", TransferStack.class);
}
/** Dual Queue */
......@@ -524,29 +527,21 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
this.isData = isData;
}
static final AtomicReferenceFieldUpdater<QNode, QNode>
nextUpdater = AtomicReferenceFieldUpdater.newUpdater
(QNode.class, QNode.class, "next");
boolean casNext(QNode cmp, QNode val) {
return (next == cmp &&
nextUpdater.compareAndSet(this, cmp, val));
return next == cmp &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
static final AtomicReferenceFieldUpdater<QNode, Object>
itemUpdater = AtomicReferenceFieldUpdater.newUpdater
(QNode.class, Object.class, "item");
boolean casItem(Object cmp, Object val) {
return (item == cmp &&
itemUpdater.compareAndSet(this, cmp, val));
return item == cmp &&
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
/**
* Tries to cancel by CAS'ing ref to this as item.
*/
void tryCancel(Object cmp) {
itemUpdater.compareAndSet(this, cmp, this);
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
boolean isCancelled() {
......@@ -561,6 +556,13 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
boolean isOffList() {
return next == this;
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
private static final long nextOffset =
objectFieldOffset(UNSAFE, "next", QNode.class);
private static final long itemOffset =
objectFieldOffset(UNSAFE, "item", QNode.class);
}
/** Head of queue */
......@@ -580,41 +582,30 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
tail = h;
}
static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
headUpdater = AtomicReferenceFieldUpdater.newUpdater
(TransferQueue.class, QNode.class, "head");
/**
* Tries to cas nh as new head; if successful, unlink
* old head's next node to avoid garbage retention.
*/
void advanceHead(QNode h, QNode nh) {
if (h == head && headUpdater.compareAndSet(this, h, nh))
if (h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
h.next = h; // forget old next
}
static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
tailUpdater = AtomicReferenceFieldUpdater.newUpdater
(TransferQueue.class, QNode.class, "tail");
/**
* Tries to cas nt as new tail.
*/
void advanceTail(QNode t, QNode nt) {
if (tail == t)
tailUpdater.compareAndSet(this, t, nt);
UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}
static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
cleanMeUpdater = AtomicReferenceFieldUpdater.newUpdater
(TransferQueue.class, QNode.class, "cleanMe");
/**
* Tries to CAS cleanMe slot.
*/
boolean casCleanMe(QNode cmp, QNode val) {
return (cleanMe == cmp &&
cleanMeUpdater.compareAndSet(this, cmp, val));
return cleanMe == cmp &&
UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
}
/**
......@@ -683,7 +674,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
s.item = s;
s.waiter = null;
}
return (x != null)? x : e;
return (x != null) ? x : e;
} else { // complementary-mode
QNode m = h.next; // node to fulfill
......@@ -700,7 +691,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null)? x : e;
return (x != null) ? x : e;
}
}
}
......@@ -716,10 +707,10 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
*/
Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */
long lastTime = (timed)? System.nanoTime() : 0;
long lastTime = timed ? System.nanoTime() : 0;
Thread w = Thread.currentThread();
int spins = ((head.next == s) ?
(timed? maxTimedSpins : maxUntimedSpins) : 0);
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel(e);
......@@ -799,6 +790,16 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
return; // Postpone cleaning s
}
}
// unsafe mechanics
private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
private static final long headOffset =
objectFieldOffset(UNSAFE, "head", TransferQueue.class);
private static final long tailOffset =
objectFieldOffset(UNSAFE, "tail", TransferQueue.class);
private static final long cleanMeOffset =
objectFieldOffset(UNSAFE, "cleanMe", TransferQueue.class);
}
/**
......@@ -824,7 +825,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
* access; otherwise the order is unspecified.
*/
public SynchronousQueue(boolean fair) {
transferer = (fair)? new TransferQueue() : new TransferStack();
transferer = fair ? new TransferQueue() : new TransferStack();
}
/**
......@@ -1141,4 +1142,17 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
transferer = new TransferStack();
}
// Unsafe mechanics
static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
String field, Class<?> klazz) {
try {
return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
} catch (NoSuchFieldException e) {
// Convert Exception to corresponding Error
NoSuchFieldError error = new NoSuchFieldError(field);
error.initCause(e);
throw error;
}
}
}
......@@ -1841,6 +1841,43 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
}
}
/**
* Returns a string identifying this pool, as well as its state,
* including indications of run state and estimated worker and
* task counts.
*
* @return a string identifying this pool, as well as its state
*/
public String toString() {
long ncompleted;
int nworkers, nactive;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
ncompleted = completedTaskCount;
nactive = 0;
nworkers = workers.size();
for (Worker w : workers) {
ncompleted += w.completedTasks;
if (w.isLocked())
++nactive;
}
} finally {
mainLock.unlock();
}
int c = ctl.get();
String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" :
(runStateAtLeast(c, TERMINATED) ? "Terminated" :
"Shutting down"));
return super.toString() +
"[" + rs +
", pool size = " + nworkers +
", active threads = " + nactive +
", queued tasks = " + workQueue.size() +
", completed tasks = " + ncompleted +
"]";
}
/* Extension hooks */
/**
......@@ -1961,7 +1998,9 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
* @throws RejectedExecutionException always.
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException();
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
......
......@@ -308,18 +308,21 @@ public interface Condition {
* condition still does not hold. Typical uses of this method take
* the following form:
*
* <pre>
* synchronized boolean aMethod(long timeout, TimeUnit unit) {
* long nanosTimeout = unit.toNanos(timeout);
* while (!conditionBeingWaitedFor) {
* if (nanosTimeout &gt; 0)
* nanosTimeout = theCondition.awaitNanos(nanosTimeout);
* else
* return false;
* <pre> {@code
* boolean aMethod(long timeout, TimeUnit unit) {
* long nanos = unit.toNanos(timeout);
* lock.lock();
* try {
* while (!conditionBeingWaitedFor()) {
* if (nanos <= 0L)
* return false;
* nanos = theCondition.awaitNanos(nanos);
* }
* // ...
* } finally {
* lock.unlock();
* }
* // ...
* }
* </pre>
* }}</pre>
*
* <p> Design note: This method requires a nanosecond argument so
* as to avoid truncation errors in reporting remaining times.
......@@ -408,18 +411,21 @@ public interface Condition {
*
* <p>The return value indicates whether the deadline has elapsed,
* which can be used as follows:
* <pre>
* synchronized boolean aMethod(Date deadline) {
* <pre> {@code
* boolean aMethod(Date deadline) {
* boolean stillWaiting = true;
* while (!conditionBeingWaitedFor) {
* if (stillWaiting)
* stillWaiting = theCondition.awaitUntil(deadline);
* else
* return false;
* lock.lock();
* try {
* while (!conditionBeingWaitedFor()) {
* if (!stillWaiting)
* return false;
* stillWaiting = theCondition.awaitUntil(deadline);
* }
* // ...
* } finally {
* lock.unlock();
* }
* // ...
* }
* </pre>
* }}</pre>
*
* <p><b>Implementation Considerations</b>
*
......@@ -450,6 +456,15 @@ public interface Condition {
* <p>If any threads are waiting on this condition then one
* is selected for waking up. That thread must then re-acquire the
* lock before returning from {@code await}.
*
* <p><b>Implementation Considerations</b>
*
* <p>An implementation may (and typically does) require that the
* current thread hold the lock associated with this {@code
* Condition} when this method is called. Implementations must
* document this precondition and any actions taken if the lock is
* not held. Typically, an exception such as {@link
* IllegalMonitorStateException} will be thrown.
*/
void signal();
......@@ -459,6 +474,15 @@ public interface Condition {
* <p>If any threads are waiting on this condition then they are
* all woken up. Each thread must re-acquire the lock before it can
* return from {@code await}.
*
* <p><b>Implementation Considerations</b>
*
* <p>An implementation may (and typically does) require that the
* current thread hold the lock associated with this {@code
* Condition} when this method is called. Implementations must
* document this precondition and any actions taken if the lock is
* not held. Typically, an exception such as {@link
* IllegalMonitorStateException} will be thrown.
*/
void signalAll();
}
......@@ -1426,7 +1426,7 @@ public class Logger {
// we didn't have a previous parent
ref = manager.new LoggerWeakRef(this);
}
ref.setParentRef(new WeakReference<>(parent));
ref.setParentRef(new WeakReference<Logger>(parent));
parent.kids.add(ref);
// As a result of the reparenting, the effective level
......
......@@ -462,7 +462,7 @@ public final class KdcComm {
*/
static class KdcAccessibility {
// Known bad KDCs
private static Set<String> bads = new HashSet<String>();
private static Set<String> bads = new HashSet<>();
private static synchronized void addBad(String kdc) {
if (DEBUG) {
......@@ -492,9 +492,9 @@ public final class KdcComm {
// Returns a preferred KDC list by putting the bad ones at the end
private static synchronized String[] list(String kdcList) {
StringTokenizer st = new StringTokenizer(kdcList);
List<String> list = new ArrayList<String>();
List<String> list = new ArrayList<>();
if (badPolicy == BpType.TRY_LAST) {
List<String> badkdcs = new ArrayList<String>();
List<String> badkdcs = new ArrayList<>();
while (st.hasMoreTokens()) {
String t = st.nextToken();
if (bads.contains(t)) badkdcs.add(t);
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册