diff --git a/src/share/vm/gc_implementation/concurrentMarkSweep/concurrentMarkSweepGeneration.cpp b/src/share/vm/gc_implementation/concurrentMarkSweep/concurrentMarkSweepGeneration.cpp
index de9ee7869d9bb6788a2b3805b0fb4094b1870a34..40714825b7dbda49e3eaf0e91e01502047d0905a 100644
--- a/src/share/vm/gc_implementation/concurrentMarkSweep/concurrentMarkSweepGeneration.cpp
+++ b/src/share/vm/gc_implementation/concurrentMarkSweep/concurrentMarkSweepGeneration.cpp
@@ -3847,7 +3847,7 @@ bool CMSConcMarkingTask::get_work_from_overflow_stack(CMSMarkStack* ovflw_stk,
   MutexLockerEx ml(ovflw_stk->par_lock(),
                    Mutex::_no_safepoint_check_flag);
   // Grab up to 1/4 the size of the work queue
-  size_t num = MIN2((size_t)work_q->max_elems()/4,
+  size_t num = MIN2((size_t)(work_q->max_elems() - work_q->size())/4,
                     (size_t)ParGCDesiredObjsFromOverflowList);
   num = MIN2(num, ovflw_stk->length());
   for (int i = (int) num; i > 0; i--) {
@@ -5204,13 +5204,12 @@ CMSParRemarkTask::do_work_steal(int i, Par_MarkRefsIntoAndScanClosure* cl,
   NOT_PRODUCT(int num_steals = 0;)
   oop obj_to_scan;
   CMSBitMap* bm = &(_collector->_markBitMap);
-  size_t num_from_overflow_list =
-           MIN2((size_t)work_q->max_elems()/4,
-                (size_t)ParGCDesiredObjsFromOverflowList);
 
   while (true) {
     // Completely finish any left over work from (an) earlier round(s)
     cl->trim_queue(0);
+    size_t num_from_overflow_list = MIN2((size_t)(work_q->max_elems() - work_q->size())/4,
+                                         (size_t)ParGCDesiredObjsFromOverflowList);
     // Now check if there's any work in the overflow list
     if (_collector->par_take_from_overflow_list(num_from_overflow_list,
                                                 work_q)) {
@@ -5622,13 +5621,12 @@ void CMSRefProcTaskProxy::do_work_steal(int i,
   OopTaskQueue* work_q = work_queue(i);
   NOT_PRODUCT(int num_steals = 0;)
   oop obj_to_scan;
-  size_t num_from_overflow_list =
-           MIN2((size_t)work_q->max_elems()/4,
-                (size_t)ParGCDesiredObjsFromOverflowList);
 
   while (true) {
     // Completely finish any left over work from (an) earlier round(s)
     drain->trim_queue(0);
+    size_t num_from_overflow_list = MIN2((size_t)(work_q->max_elems() - work_q->size())/4,
+                                         (size_t)ParGCDesiredObjsFromOverflowList);
     // Now check if there's any work in the overflow list
     if (_collector->par_take_from_overflow_list(num_from_overflow_list,
                                                 work_q)) {
@@ -9021,7 +9019,7 @@ void ASConcurrentMarkSweepGeneration::shrink_by(size_t desired_bytes) {
 // Transfer some number of overflown objects to usual marking
 // stack. Return true if some objects were transferred.
 bool MarkRefsIntoAndScanClosure::take_from_overflow_list() {
-  size_t num = MIN2((size_t)_mark_stack->capacity()/4,
+  size_t num = MIN2((size_t)(_mark_stack->capacity() - _mark_stack->length())/4,
                     (size_t)ParGCDesiredObjsFromOverflowList);
 
   bool res = _collector->take_from_overflow_list(num, _mark_stack);
diff --git a/src/share/vm/gc_implementation/parNew/parNewGeneration.cpp b/src/share/vm/gc_implementation/parNew/parNewGeneration.cpp
index 7bafe50aa361467d6688e0b037460cc51cbddb99..bec5507ef76946645cf4d3d5a87f86e73338f645 100644
--- a/src/share/vm/gc_implementation/parNew/parNewGeneration.cpp
+++ b/src/share/vm/gc_implementation/parNew/parNewGeneration.cpp
@@ -36,7 +36,7 @@ ParScanThreadState::ParScanThreadState(Space* to_space_,
                                        ObjToScanQueueSet* work_queue_set_,
                                        size_t desired_plab_sz_,
                                        ParallelTaskTerminator& term_) :
-  _to_space(to_space_), _old_gen(old_gen_), _thread_num(thread_num_),
+  _to_space(to_space_), _old_gen(old_gen_), _young_gen(gen_), _thread_num(thread_num_),
   _work_queue(work_queue_set_->queue(thread_num_)), _to_space_full(false),
   _ageTable(false), // false ==> not the global age table, no perf data.
   _to_space_alloc_buffer(desired_plab_sz_),
@@ -57,6 +57,11 @@ ParScanThreadState::ParScanThreadState(Space* to_space_,
   _start = os::elapsedTime();
   _old_gen_closure.set_generation(old_gen_);
   _old_gen_root_closure.set_generation(old_gen_);
+  if (UseCompressedOops) {
+    _overflow_stack = new (ResourceObj::C_HEAP) GrowableArray<oop>(512, true);
+  } else {
+    _overflow_stack = NULL;
+  }
 }
 #ifdef _MSC_VER
 #pragma warning( pop )
@@ -81,7 +86,7 @@ void ParScanThreadState::scan_partial_array_and_push_remainder(oop old) {
   assert(old->is_objArray(), "must be obj array");
   assert(old->is_forwarded(), "must be forwarded");
   assert(Universe::heap()->is_in_reserved(old), "must be in heap.");
-  assert(!_old_gen->is_in(old), "must be in young generation.");
+  assert(!old_gen()->is_in(old), "must be in young generation.");
 
   objArrayOop obj = objArrayOop(old->forwardee());
   // Process ParGCArrayScanChunk elements now
@@ -119,26 +124,68 @@ void ParScanThreadState::scan_partial_array_and_push_remainder(oop old) {
 
 void ParScanThreadState::trim_queues(int max_size) {
   ObjToScanQueue* queue = work_queue();
-  while (queue->size() > (juint)max_size) {
-    oop obj_to_scan;
-    if (queue->pop_local(obj_to_scan)) {
-      note_pop();
-
-      if ((HeapWord *)obj_to_scan < young_old_boundary()) {
-        if (obj_to_scan->is_objArray() &&
-            obj_to_scan->is_forwarded() &&
-            obj_to_scan->forwardee() != obj_to_scan) {
-          scan_partial_array_and_push_remainder(obj_to_scan);
+  do {
+    while (queue->size() > (juint)max_size) {
+      oop obj_to_scan;
+      if (queue->pop_local(obj_to_scan)) {
+        note_pop();
+        if ((HeapWord *)obj_to_scan < young_old_boundary()) {
+          if (obj_to_scan->is_objArray() &&
+              obj_to_scan->is_forwarded() &&
+              obj_to_scan->forwardee() != obj_to_scan) {
+            scan_partial_array_and_push_remainder(obj_to_scan);
+          } else {
+            // object is in to_space
+            obj_to_scan->oop_iterate(&_to_space_closure);
+          }
         } else {
-          // object is in to_space
-          obj_to_scan->oop_iterate(&_to_space_closure);
+          // object is in old generation
+          obj_to_scan->oop_iterate(&_old_gen_closure);
         }
-      } else {
-        // object is in old generation
-        obj_to_scan->oop_iterate(&_old_gen_closure);
       }
     }
+    // For the  case of compressed oops, we have a private, non-shared
+    // overflow stack, so we eagerly drain it so as to more evenly
+    // distribute load early. Note: this may be good to do in
+    // general rather than delay for the final stealing phase.
+    // If applicable, we'll transfer a set of objects over to our
+    // work queue, allowing them to be stolen and draining our
+    // private overflow stack.
+  } while (ParGCTrimOverflow && young_gen()->take_from_overflow_list(this));
+}
+
+bool ParScanThreadState::take_from_overflow_stack() {
+  assert(UseCompressedOops, "Else should not call");
+  assert(young_gen()->overflow_list() == NULL, "Error");
+  ObjToScanQueue* queue = work_queue();
+  GrowableArray<oop>* of_stack = overflow_stack();
+  uint num_overflow_elems = of_stack->length();
+  uint num_take_elems     = MIN2(MIN2((queue->max_elems() - queue->size())/4,
+                                      (juint)ParGCDesiredObjsFromOverflowList),
+                                 num_overflow_elems);
+  // Transfer the most recent num_take_elems from the overflow
+  // stack to our work queue.
+  for (size_t i = 0; i != num_take_elems; i++) {
+    oop cur = of_stack->pop();
+    oop obj_to_push = cur->forwardee();
+    assert(Universe::heap()->is_in_reserved(cur), "Should be in heap");
+    assert(!old_gen()->is_in_reserved(cur), "Should be in young gen");
+    assert(Universe::heap()->is_in_reserved(obj_to_push), "Should be in heap");
+    if (should_be_partially_scanned(obj_to_push, cur)) {
+      assert(arrayOop(cur)->length() == 0, "entire array remaining to be scanned");
+      obj_to_push = cur;
+    }
+    bool ok = queue->push(obj_to_push);
+    assert(ok, "Should have succeeded");
   }
+  assert(young_gen()->overflow_list() == NULL, "Error");
+  return num_take_elems > 0;  // was something transferred?
+}
+
+void ParScanThreadState::push_on_overflow_stack(oop p) {
+  assert(UseCompressedOops, "Else should not call");
+  overflow_stack()->push(p);
+  assert(young_gen()->overflow_list() == NULL, "Error");
 }
 
 HeapWord* ParScanThreadState::alloc_in_to_space_slow(size_t word_sz) {
@@ -425,8 +472,7 @@ void ParNewGenTask::work(int i) {
   ResourceMark rm;
   HandleMark hm;
   // We would need multiple old-gen queues otherwise.
-  guarantee(gch->n_gens() == 2,
-     "Par young collection currently only works with one older gen.");
+  assert(gch->n_gens() == 2, "Par young collection currently only works with one older gen.");
 
   Generation* old_gen = gch->next_gen(_gen);
 
@@ -1169,36 +1215,75 @@ bool ParNewGeneration::should_simulate_overflow() {
 }
 #endif
 
+// In case we are using compressed oops, we need to be careful.
+// If the object being pushed is an object array, then its length
+// field keeps track of the "grey boundary" at which the next
+// incremental scan will be done (see ParGCArrayScanChunk).
+// When using compressed oops, this length field is kept in the
+// lower 32 bits of the erstwhile klass word and cannot be used
+// for the overflow chaining pointer (OCP below). As such the OCP
+// would itself need to be compressed into the top 32-bits in this
+// case. Unfortunately, see below, in the event that we have a
+// promotion failure, the node to be pushed on the list can be
+// outside of the Java heap, so the heap-based pointer compression
+// would not work (we would have potential aliasing between C-heap
+// and Java-heap pointers). For this reason, when using compressed
+// oops, we simply use a worker-thread-local, non-shared overflow
+// list in the form of a growable array, with a slightly different
+// overflow stack draining strategy. If/when we start using fat
+// stacks here, we can go back to using (fat) pointer chains
+// (although some performance comparisons would be useful since
+// single global lists have their own performance disadvantages
+// as we were made painfully aware not long ago, see 6786503).
 #define BUSY (oop(0x1aff1aff))
 void ParNewGeneration::push_on_overflow_list(oop from_space_obj, ParScanThreadState* par_scan_state) {
-  // if the object has been forwarded to itself, then we cannot
-  // use the klass pointer for the linked list.  Instead we have
-  // to allocate an oopDesc in the C-Heap and use that for the linked list.
-  // XXX This is horribly inefficient when a promotion failure occurs
-  // and should be fixed. XXX FIX ME !!!
+  assert(is_in_reserved(from_space_obj), "Should be from this generation");
+  if (UseCompressedOops) {
+    // In the case of compressed oops, we use a private, not-shared
+    // overflow stack.
+    par_scan_state->push_on_overflow_stack(from_space_obj);
+  } else {
+    // if the object has been forwarded to itself, then we cannot
+    // use the klass pointer for the linked list.  Instead we have
+    // to allocate an oopDesc in the C-Heap and use that for the linked list.
+    // XXX This is horribly inefficient when a promotion failure occurs
+    // and should be fixed. XXX FIX ME !!!
 #ifndef PRODUCT
-  Atomic::inc_ptr(&_num_par_pushes);
-  assert(_num_par_pushes > 0, "Tautology");
+    Atomic::inc_ptr(&_num_par_pushes);
+    assert(_num_par_pushes > 0, "Tautology");
 #endif
-  if (from_space_obj->forwardee() == from_space_obj) {
-    oopDesc* listhead = NEW_C_HEAP_ARRAY(oopDesc, 1);
-    listhead->forward_to(from_space_obj);
-    from_space_obj = listhead;
-  }
-  oop observed_overflow_list = _overflow_list;
-  oop cur_overflow_list;
-  do {
-    cur_overflow_list = observed_overflow_list;
-    if (cur_overflow_list != BUSY) {
-      from_space_obj->set_klass_to_list_ptr(cur_overflow_list);
-    } else {
-      from_space_obj->set_klass_to_list_ptr(NULL);
+    if (from_space_obj->forwardee() == from_space_obj) {
+      oopDesc* listhead = NEW_C_HEAP_ARRAY(oopDesc, 1);
+      listhead->forward_to(from_space_obj);
+      from_space_obj = listhead;
     }
-    observed_overflow_list =
-      (oop)Atomic::cmpxchg_ptr(from_space_obj, &_overflow_list, cur_overflow_list);
-  } while (cur_overflow_list != observed_overflow_list);
+    oop observed_overflow_list = _overflow_list;
+    oop cur_overflow_list;
+    do {
+      cur_overflow_list = observed_overflow_list;
+      if (cur_overflow_list != BUSY) {
+        from_space_obj->set_klass_to_list_ptr(cur_overflow_list);
+      } else {
+        from_space_obj->set_klass_to_list_ptr(NULL);
+      }
+      observed_overflow_list =
+        (oop)Atomic::cmpxchg_ptr(from_space_obj, &_overflow_list, cur_overflow_list);
+    } while (cur_overflow_list != observed_overflow_list);
+  }
 }
 
+bool ParNewGeneration::take_from_overflow_list(ParScanThreadState* par_scan_state) {
+  bool res;
+
+  if (UseCompressedOops) {
+    res = par_scan_state->take_from_overflow_stack();
+  } else {
+    res = take_from_overflow_list_work(par_scan_state);
+  }
+  return res;
+}
+
+
 // *NOTE*: The overflow list manipulation code here and
 // in CMSCollector:: are very similar in shape,
 // except that in the CMS case we thread the objects
@@ -1213,14 +1298,13 @@ void ParNewGeneration::push_on_overflow_list(oop from_space_obj, ParScanThreadSt
 // similar changes might be needed.
 // See CMSCollector::par_take_from_overflow_list() for
 // more extensive documentation comments.
-bool
-ParNewGeneration::take_from_overflow_list(ParScanThreadState* par_scan_state) {
+bool ParNewGeneration::take_from_overflow_list_work(ParScanThreadState* par_scan_state) {
   ObjToScanQueue* work_q = par_scan_state->work_queue();
-  assert(work_q->size() == 0, "Should first empty local work queue");
   // How many to take?
-  size_t objsFromOverflow = MIN2((size_t)work_q->max_elems()/4,
+  size_t objsFromOverflow = MIN2((size_t)(work_q->max_elems() - work_q->size())/4,
                                  (size_t)ParGCDesiredObjsFromOverflowList);
 
+  assert(par_scan_state->overflow_stack() == NULL, "Error");
   if (_overflow_list == NULL) return false;
 
   // Otherwise, there was something there; try claiming the list.
diff --git a/src/share/vm/gc_implementation/parNew/parNewGeneration.hpp b/src/share/vm/gc_implementation/parNew/parNewGeneration.hpp
index 51e4c5f39f177572f30cb346fc6674e638ed42d8..7f0015bd676cf00f74402c73a348ebaeb705dc30 100644
--- a/src/share/vm/gc_implementation/parNew/parNewGeneration.hpp
+++ b/src/share/vm/gc_implementation/parNew/parNewGeneration.hpp
@@ -55,6 +55,7 @@ class ParScanThreadState {
   friend class ParScanThreadStateSet;
  private:
   ObjToScanQueue *_work_queue;
+  GrowableArray<oop>* _overflow_stack;
 
   ParGCAllocBuffer _to_space_alloc_buffer;
 
@@ -79,6 +80,9 @@ class ParScanThreadState {
   Space* _to_space;
   Space* to_space() { return _to_space; }
 
+  ParNewGeneration* _young_gen;
+  ParNewGeneration* young_gen() const { return _young_gen; }
+
   Generation* _old_gen;
   Generation* old_gen() { return _old_gen; }
 
@@ -134,6 +138,11 @@ class ParScanThreadState {
   // Decrease queue size below "max_size".
   void trim_queues(int max_size);
 
+  // Private overflow stack usage
+  GrowableArray<oop>* overflow_stack() { return _overflow_stack; }
+  bool take_from_overflow_stack();
+  void push_on_overflow_stack(oop p);
+
   // Is new_obj a candidate for scan_partial_array_and_push_remainder method.
   inline bool should_be_partially_scanned(oop new_obj, oop old_obj) const;
 
@@ -378,13 +387,17 @@ class ParNewGeneration: public DefNewGeneration {
   NOT_PRODUCT(int _overflow_counter;)
   NOT_PRODUCT(bool should_simulate_overflow();)
 
+  // Accessor for overflow list
+  oop overflow_list() { return _overflow_list; }
+
   // Push the given (from-space) object on the global overflow list.
   void push_on_overflow_list(oop from_space_obj, ParScanThreadState* par_scan_state);
 
   // If the global overflow list is non-empty, move some tasks from it
-  // onto "work_q" (which must be empty).  No more than 1/4 of the
-  // max_elems of "work_q" are moved.
+  // onto "work_q" (which need not be empty).  No more than 1/4 of the
+  // available space on "work_q" is used.
   bool take_from_overflow_list(ParScanThreadState* par_scan_state);
+  bool take_from_overflow_list_work(ParScanThreadState* par_scan_state);
 
   // The task queues to be used by parallel GC threads.
   ObjToScanQueueSet* task_queues() {
diff --git a/src/share/vm/runtime/globals.hpp b/src/share/vm/runtime/globals.hpp
index 2d33287fcd4b874063a98afee69e06dc6711c967..b80ec6fbda23bfd1723d1cf205f9fb998c6893ff 100644
--- a/src/share/vm/runtime/globals.hpp
+++ b/src/share/vm/runtime/globals.hpp
@@ -1316,6 +1316,9 @@ class CommandLineFlags {
   product(intx, ParGCArrayScanChunk, 50,                                    \
           "Scan a subset and push remainder, if array is bigger than this") \
                                                                             \
+  product(bool, ParGCTrimOverflow, true,                                    \
+          "Eagerly trim the overflow lists (useful for UseCompressedOops")  \
+                                                                            \
   notproduct(bool, ParGCWorkQueueOverflowALot, false,                       \
           "Whether we should simulate work queue overflow in ParNew")       \
                                                                             \