812 lines
27 KiB
C++
812 lines
27 KiB
C++
/*
|
|
* Copyright (C) 2015-2016 Apple Inc. All rights reserved.
|
|
*
|
|
* Redistribution and use in source and binary forms, with or without
|
|
* modification, are permitted provided that the following conditions
|
|
* are met:
|
|
* 1. Redistributions of source code must retain the above copyright
|
|
* notice, this list of conditions and the following disclaimer.
|
|
* 2. Redistributions in binary form must reproduce the above copyright
|
|
* notice, this list of conditions and the following disclaimer in the
|
|
* documentation and/or other materials provided with the distribution.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
|
|
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
|
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
|
|
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
|
|
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
|
|
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
|
|
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
|
|
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
|
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
|
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
*/
|
|
|
|
#include "config.h"
|
|
#include <wtf/ParkingLot.h>
|
|
|
|
#include <mutex>
|
|
#include <wtf/DataLog.h>
|
|
#include <wtf/HashFunctions.h>
|
|
#include <wtf/StringPrintStream.h>
|
|
#include <wtf/ThreadSpecific.h>
|
|
#include <wtf/Threading.h>
|
|
#include <wtf/Vector.h>
|
|
#include <wtf/WeakRandom.h>
|
|
#include <wtf/WordLock.h>
|
|
|
|
namespace WTF {
|
|
|
|
namespace {
|
|
|
|
static constexpr bool verbose = false;
|
|
|
|
struct ThreadData : public ThreadSafeRefCounted<ThreadData> {
|
|
WTF_MAKE_FAST_ALLOCATED;
|
|
public:
|
|
|
|
ThreadData();
|
|
~ThreadData();
|
|
|
|
Ref<Thread> thread;
|
|
|
|
Mutex parkingLock;
|
|
ThreadCondition parkingCondition;
|
|
|
|
const void* address { nullptr };
|
|
|
|
ThreadData* nextInQueue { nullptr };
|
|
|
|
intptr_t token { 0 };
|
|
};
|
|
|
|
enum class DequeueResult {
|
|
Ignore,
|
|
RemoveAndContinue,
|
|
RemoveAndStop
|
|
};
|
|
|
|
struct Bucket {
|
|
WTF_MAKE_FAST_ALLOCATED;
|
|
public:
|
|
Bucket()
|
|
: random(static_cast<unsigned>(bitwise_cast<intptr_t>(this))) // Cannot use default seed since that recurses into Lock.
|
|
{
|
|
}
|
|
|
|
void enqueue(ThreadData* data)
|
|
{
|
|
if (verbose)
|
|
dataLog(toString(Thread::current(), ": enqueueing ", RawPointer(data), " with address = ", RawPointer(data->address), " onto ", RawPointer(this), "\n"));
|
|
ASSERT(data->address);
|
|
ASSERT(!data->nextInQueue);
|
|
|
|
if (queueTail) {
|
|
queueTail->nextInQueue = data;
|
|
queueTail = data;
|
|
return;
|
|
}
|
|
|
|
queueHead = data;
|
|
queueTail = data;
|
|
}
|
|
|
|
template<typename Functor>
|
|
void genericDequeue(const Functor& functor)
|
|
{
|
|
if (verbose)
|
|
dataLog(toString(Thread::current(), ": dequeueing from bucket at ", RawPointer(this), "\n"));
|
|
|
|
if (!queueHead) {
|
|
if (verbose)
|
|
dataLog(toString(Thread::current(), ": empty.\n"));
|
|
return;
|
|
}
|
|
|
|
// This loop is a very clever abomination. The induction variables are the pointer to the
|
|
// pointer to the current node, and the pointer to the previous node. This gives us everything
|
|
// we need to both proceed forward to the next node, and to remove nodes while maintaining the
|
|
// queueHead/queueTail and all of the nextInQueue links. For example, when we are at the head
|
|
// element, then removal means rewiring queueHead, and if it was also equal to queueTail, then
|
|
// we'd want queueTail to be set to nullptr. This works because:
|
|
//
|
|
// currentPtr == &queueHead
|
|
// previous == nullptr
|
|
//
|
|
// We remove by setting *currentPtr = (*currentPtr)->nextInQueue, i.e. changing the pointer
|
|
// that used to point to this node to instead point to this node's successor. Another example:
|
|
// if we were at the second node in the queue, then we'd have:
|
|
//
|
|
// currentPtr == &queueHead->nextInQueue
|
|
// previous == queueHead
|
|
//
|
|
// If this node is not equal to queueTail, then removing it simply means making
|
|
// queueHead->nextInQueue point to queueHead->nextInQueue->nextInQueue (which the algorithm
|
|
// achieves by mutating *currentPtr). If this node is equal to queueTail, then we want to set
|
|
// queueTail to previous, which in this case is queueHead - thus making the queue look like a
|
|
// proper one-element queue with queueHead == queueTail.
|
|
bool shouldContinue = true;
|
|
ThreadData** currentPtr = &queueHead;
|
|
ThreadData* previous = nullptr;
|
|
|
|
MonotonicTime time = MonotonicTime::now();
|
|
bool timeToBeFair = false;
|
|
if (time > nextFairTime)
|
|
timeToBeFair = true;
|
|
|
|
bool didDequeue = false;
|
|
|
|
while (shouldContinue) {
|
|
ThreadData* current = *currentPtr;
|
|
if (verbose)
|
|
dataLog(toString(Thread::current(), ": got thread ", RawPointer(current), "\n"));
|
|
if (!current)
|
|
break;
|
|
DequeueResult result = functor(current, timeToBeFair);
|
|
switch (result) {
|
|
case DequeueResult::Ignore:
|
|
if (verbose)
|
|
dataLog(toString(Thread::current(), ": currentPtr = ", RawPointer(currentPtr), ", *currentPtr = ", RawPointer(*currentPtr), "\n"));
|
|
previous = current;
|
|
currentPtr = &(*currentPtr)->nextInQueue;
|
|
break;
|
|
case DequeueResult::RemoveAndStop:
|
|
shouldContinue = false;
|
|
FALLTHROUGH;
|
|
case DequeueResult::RemoveAndContinue:
|
|
if (verbose)
|
|
dataLog(toString(Thread::current(), ": dequeueing ", RawPointer(current), " from ", RawPointer(this), "\n"));
|
|
if (current == queueTail)
|
|
queueTail = previous;
|
|
didDequeue = true;
|
|
*currentPtr = current->nextInQueue;
|
|
current->nextInQueue = nullptr;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (timeToBeFair && didDequeue)
|
|
nextFairTime = time + Seconds::fromMilliseconds(random.get());
|
|
|
|
ASSERT(!!queueHead == !!queueTail);
|
|
}
|
|
|
|
ThreadData* dequeue()
|
|
{
|
|
ThreadData* result = nullptr;
|
|
genericDequeue(
|
|
[&] (ThreadData* element, bool) -> DequeueResult {
|
|
result = element;
|
|
return DequeueResult::RemoveAndStop;
|
|
});
|
|
return result;
|
|
}
|
|
|
|
ThreadData* queueHead { nullptr };
|
|
ThreadData* queueTail { nullptr };
|
|
|
|
// This lock protects the entire bucket. Thou shall not make changes to Bucket without holding
|
|
// this lock.
|
|
WordLock lock;
|
|
|
|
MonotonicTime nextFairTime;
|
|
|
|
WeakRandom random;
|
|
|
|
// Put some distane between buckets in memory. This is one of several mitigations against false
|
|
// sharing.
|
|
char padding[64];
|
|
};
|
|
|
|
struct Hashtable;
|
|
|
|
// We track all allocated hashtables so that hashtable resizing doesn't anger leak detectors.
|
|
Vector<Hashtable*>* hashtables;
|
|
WordLock hashtablesLock;
|
|
|
|
struct Hashtable {
|
|
unsigned size;
|
|
Atomic<Bucket*> data[1];
|
|
|
|
static Hashtable* create(unsigned size)
|
|
{
|
|
ASSERT(size >= 1);
|
|
|
|
Hashtable* result = static_cast<Hashtable*>(
|
|
fastZeroedMalloc(sizeof(Hashtable) + sizeof(Atomic<Bucket*>) * (size - 1)));
|
|
result->size = size;
|
|
|
|
{
|
|
// This is not fast and it's not data-access parallel, but that's fine, because
|
|
// hashtable resizing is guaranteed to be rare and it will never happen in steady
|
|
// state.
|
|
WordLockHolder locker(hashtablesLock);
|
|
if (!hashtables)
|
|
hashtables = new Vector<Hashtable*>();
|
|
hashtables->append(result);
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
static void destroy(Hashtable* hashtable)
|
|
{
|
|
{
|
|
// This is not fast, but that's OK. See comment in create().
|
|
WordLockHolder locker(hashtablesLock);
|
|
hashtables->removeFirst(hashtable);
|
|
}
|
|
|
|
fastFree(hashtable);
|
|
}
|
|
};
|
|
|
|
Atomic<Hashtable*> hashtable;
|
|
Atomic<unsigned> numThreads;
|
|
|
|
// With 64 bytes of padding per bucket, assuming a hashtable is fully populated with buckets, the
|
|
// memory usage per thread will still be less than 1KB.
|
|
const unsigned maxLoadFactor = 3;
|
|
|
|
const unsigned growthFactor = 2;
|
|
|
|
unsigned hashAddress(const void* address)
|
|
{
|
|
return WTF::PtrHash<const void*>::hash(address);
|
|
}
|
|
|
|
Hashtable* ensureHashtable()
|
|
{
|
|
for (;;) {
|
|
Hashtable* currentHashtable = hashtable.load();
|
|
|
|
if (currentHashtable)
|
|
return currentHashtable;
|
|
|
|
if (!currentHashtable) {
|
|
currentHashtable = Hashtable::create(maxLoadFactor);
|
|
if (hashtable.compareExchangeWeak(nullptr, currentHashtable)) {
|
|
if (verbose)
|
|
dataLog(toString(Thread::current(), ": created initial hashtable ", RawPointer(currentHashtable), "\n"));
|
|
return currentHashtable;
|
|
}
|
|
|
|
Hashtable::destroy(currentHashtable);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Locks the hashtable. This reloops in case of rehashing, so the current hashtable may be different
|
|
// after this returns than when you called it. Guarantees that there is a hashtable. This is pretty
|
|
// slow and not scalable, so it's only used during thread creation and for debugging/testing.
|
|
Vector<Bucket*> lockHashtable()
|
|
{
|
|
for (;;) {
|
|
Hashtable* currentHashtable = ensureHashtable();
|
|
|
|
ASSERT(currentHashtable);
|
|
|
|
// Now find all of the buckets. This makes sure that the hashtable is full of buckets so that
|
|
// we can lock all of the buckets, not just the ones that are materialized.
|
|
Vector<Bucket*> buckets;
|
|
for (unsigned i = currentHashtable->size; i--;) {
|
|
Atomic<Bucket*>& bucketPointer = currentHashtable->data[i];
|
|
|
|
for (;;) {
|
|
Bucket* bucket = bucketPointer.load();
|
|
|
|
if (!bucket) {
|
|
bucket = new Bucket();
|
|
if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) {
|
|
delete bucket;
|
|
continue;
|
|
}
|
|
}
|
|
|
|
buckets.append(bucket);
|
|
break;
|
|
}
|
|
}
|
|
|
|
// Now lock the buckets in the right order.
|
|
std::sort(buckets.begin(), buckets.end());
|
|
for (Bucket* bucket : buckets)
|
|
bucket->lock.lock();
|
|
|
|
// If the hashtable didn't change (wasn't rehashed) while we were locking it, then we own it
|
|
// now.
|
|
if (hashtable.load() == currentHashtable)
|
|
return buckets;
|
|
|
|
// The hashtable rehashed. Unlock everything and try again.
|
|
for (Bucket* bucket : buckets)
|
|
bucket->lock.unlock();
|
|
}
|
|
}
|
|
|
|
void unlockHashtable(const Vector<Bucket*>& buckets)
|
|
{
|
|
for (Bucket* bucket : buckets)
|
|
bucket->lock.unlock();
|
|
}
|
|
|
|
// Rehash the hashtable to handle numThreads threads.
|
|
void ensureHashtableSize(unsigned numThreads)
|
|
{
|
|
// We try to ensure that the size of the hashtable used for thread queues is always large enough
|
|
// to avoid collisions. So, since we started a new thread, we may need to increase the size of the
|
|
// hashtable. This does just that. Note that we never free the old spine, since we never lock
|
|
// around spine accesses (i.e. the "hashtable" global variable).
|
|
|
|
// First do a fast check to see if rehashing is needed.
|
|
Hashtable* oldHashtable = hashtable.load();
|
|
if (oldHashtable && static_cast<double>(oldHashtable->size) / static_cast<double>(numThreads) >= maxLoadFactor) {
|
|
if (verbose)
|
|
dataLog(toString(Thread::current(), ": no need to rehash because ", oldHashtable->size, " / ", numThreads, " >= ", maxLoadFactor, "\n"));
|
|
return;
|
|
}
|
|
|
|
// Seems like we *might* have to rehash, so lock the hashtable and try again.
|
|
Vector<Bucket*> bucketsToUnlock = lockHashtable();
|
|
|
|
// Check again, since the hashtable could have rehashed while we were locking it. Also,
|
|
// lockHashtable() creates an initial hashtable for us.
|
|
oldHashtable = hashtable.load();
|
|
RELEASE_ASSERT(oldHashtable);
|
|
if (static_cast<double>(oldHashtable->size) / static_cast<double>(numThreads) >= maxLoadFactor) {
|
|
if (verbose)
|
|
dataLog(toString(Thread::current(), ": after locking, no need to rehash because ", oldHashtable->size, " / ", numThreads, " >= ", maxLoadFactor, "\n"));
|
|
unlockHashtable(bucketsToUnlock);
|
|
return;
|
|
}
|
|
|
|
Vector<Bucket*> reusableBuckets = bucketsToUnlock;
|
|
|
|
// OK, now we resize. First we gather all thread datas from the old hashtable. These thread datas
|
|
// are placed into the vector in queue order.
|
|
Vector<ThreadData*> threadDatas;
|
|
for (Bucket* bucket : reusableBuckets) {
|
|
while (ThreadData* threadData = bucket->dequeue())
|
|
threadDatas.append(threadData);
|
|
}
|
|
|
|
unsigned newSize = numThreads * growthFactor * maxLoadFactor;
|
|
RELEASE_ASSERT(newSize > oldHashtable->size);
|
|
|
|
Hashtable* newHashtable = Hashtable::create(newSize);
|
|
if (verbose)
|
|
dataLog(toString(Thread::current(), ": created new hashtable: ", RawPointer(newHashtable), "\n"));
|
|
for (ThreadData* threadData : threadDatas) {
|
|
if (verbose)
|
|
dataLog(toString(Thread::current(), ": rehashing thread data ", RawPointer(threadData), " with address = ", RawPointer(threadData->address), "\n"));
|
|
unsigned hash = hashAddress(threadData->address);
|
|
unsigned index = hash % newHashtable->size;
|
|
if (verbose)
|
|
dataLog(toString(Thread::current(), ": index = ", index, "\n"));
|
|
Bucket* bucket = newHashtable->data[index].load();
|
|
if (!bucket) {
|
|
if (reusableBuckets.isEmpty())
|
|
bucket = new Bucket();
|
|
else
|
|
bucket = reusableBuckets.takeLast();
|
|
newHashtable->data[index].store(bucket);
|
|
}
|
|
|
|
bucket->enqueue(threadData);
|
|
}
|
|
|
|
// At this point there may be some buckets left unreused. This could easily happen if the
|
|
// number of enqueued threads right now is low but the high watermark of the number of threads
|
|
// enqueued was high. We place these buckets into the hashtable basically at random, just to
|
|
// make sure we don't leak them.
|
|
for (unsigned i = 0; i < newHashtable->size && !reusableBuckets.isEmpty(); ++i) {
|
|
Atomic<Bucket*>& bucketPtr = newHashtable->data[i];
|
|
if (bucketPtr.load())
|
|
continue;
|
|
bucketPtr.store(reusableBuckets.takeLast());
|
|
}
|
|
|
|
// Since we increased the size of the hashtable, we should have exhausted our preallocated
|
|
// buckets by now.
|
|
ASSERT(reusableBuckets.isEmpty());
|
|
|
|
// OK, right now the old hashtable is locked up and the new hashtable is ready to rock and
|
|
// roll. After we install the new hashtable, we can release all bucket locks.
|
|
|
|
bool result = hashtable.compareExchangeStrong(oldHashtable, newHashtable) == oldHashtable;
|
|
RELEASE_ASSERT(result);
|
|
|
|
unlockHashtable(bucketsToUnlock);
|
|
}
|
|
|
|
ThreadData::ThreadData()
|
|
: thread(Thread::current())
|
|
{
|
|
unsigned currentNumThreads;
|
|
for (;;) {
|
|
unsigned oldNumThreads = numThreads.load();
|
|
currentNumThreads = oldNumThreads + 1;
|
|
if (numThreads.compareExchangeWeak(oldNumThreads, currentNumThreads))
|
|
break;
|
|
}
|
|
|
|
ensureHashtableSize(currentNumThreads);
|
|
}
|
|
|
|
ThreadData::~ThreadData()
|
|
{
|
|
for (;;) {
|
|
unsigned oldNumThreads = numThreads.load();
|
|
if (numThreads.compareExchangeWeak(oldNumThreads, oldNumThreads - 1))
|
|
break;
|
|
}
|
|
}
|
|
|
|
ThreadData* myThreadData()
|
|
{
|
|
static ThreadSpecific<RefPtr<ThreadData>, CanBeGCThread::True>* threadData;
|
|
static std::once_flag initializeOnce;
|
|
std::call_once(
|
|
initializeOnce,
|
|
[] {
|
|
threadData = new ThreadSpecific<RefPtr<ThreadData>, CanBeGCThread::True>();
|
|
});
|
|
|
|
RefPtr<ThreadData>& result = **threadData;
|
|
|
|
if (!result)
|
|
result = adoptRef(new ThreadData());
|
|
|
|
return result.get();
|
|
}
|
|
|
|
template<typename Functor>
|
|
bool enqueue(const void* address, const Functor& functor)
|
|
{
|
|
unsigned hash = hashAddress(address);
|
|
|
|
for (;;) {
|
|
Hashtable* myHashtable = ensureHashtable();
|
|
unsigned index = hash % myHashtable->size;
|
|
Atomic<Bucket*>& bucketPointer = myHashtable->data[index];
|
|
Bucket* bucket;
|
|
for (;;) {
|
|
bucket = bucketPointer.load();
|
|
if (!bucket) {
|
|
bucket = new Bucket();
|
|
if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) {
|
|
delete bucket;
|
|
continue;
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
if (verbose)
|
|
dataLog(toString(Thread::current(), ": enqueueing onto bucket ", RawPointer(bucket), " with index ", index, " for address ", RawPointer(address), " with hash ", hash, "\n"));
|
|
bucket->lock.lock();
|
|
|
|
// At this point the hashtable could have rehashed under us.
|
|
if (hashtable.load() != myHashtable) {
|
|
bucket->lock.unlock();
|
|
continue;
|
|
}
|
|
|
|
ThreadData* threadData = functor();
|
|
bool result;
|
|
if (threadData) {
|
|
if (verbose)
|
|
dataLog(toString(Thread::current(), ": proceeding to enqueue ", RawPointer(threadData), "\n"));
|
|
bucket->enqueue(threadData);
|
|
result = true;
|
|
} else
|
|
result = false;
|
|
bucket->lock.unlock();
|
|
return result;
|
|
}
|
|
}
|
|
|
|
enum class BucketMode {
|
|
EnsureNonEmpty,
|
|
IgnoreEmpty
|
|
};
|
|
|
|
template<typename DequeueFunctor, typename FinishFunctor>
|
|
bool dequeue(
|
|
const void* address, BucketMode bucketMode, const DequeueFunctor& dequeueFunctor,
|
|
const FinishFunctor& finishFunctor)
|
|
{
|
|
unsigned hash = hashAddress(address);
|
|
|
|
for (;;) {
|
|
Hashtable* myHashtable = ensureHashtable();
|
|
unsigned index = hash % myHashtable->size;
|
|
Atomic<Bucket*>& bucketPointer = myHashtable->data[index];
|
|
Bucket* bucket = bucketPointer.load();
|
|
if (!bucket) {
|
|
if (bucketMode == BucketMode::IgnoreEmpty)
|
|
return false;
|
|
|
|
for (;;) {
|
|
bucket = bucketPointer.load();
|
|
if (!bucket) {
|
|
bucket = new Bucket();
|
|
if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) {
|
|
delete bucket;
|
|
continue;
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
|
|
bucket->lock.lock();
|
|
|
|
// At this point the hashtable could have rehashed under us.
|
|
if (hashtable.load() != myHashtable) {
|
|
bucket->lock.unlock();
|
|
continue;
|
|
}
|
|
|
|
bucket->genericDequeue(dequeueFunctor);
|
|
bool result = !!bucket->queueHead;
|
|
finishFunctor(result);
|
|
bucket->lock.unlock();
|
|
return result;
|
|
}
|
|
}
|
|
|
|
} // anonymous namespace
|
|
|
|
NEVER_INLINE ParkingLot::ParkResult ParkingLot::parkConditionallyImpl(
|
|
const void* address,
|
|
const ScopedLambda<bool()>& validation,
|
|
const ScopedLambda<void()>& beforeSleep,
|
|
const TimeWithDynamicClockType& timeout)
|
|
{
|
|
if (verbose)
|
|
dataLog(toString(Thread::current(), ": parking.\n"));
|
|
|
|
ThreadData* me = myThreadData();
|
|
me->token = 0;
|
|
|
|
// Guard against someone calling parkConditionally() recursively from beforeSleep().
|
|
RELEASE_ASSERT(!me->address);
|
|
|
|
bool enqueueResult = enqueue(
|
|
address,
|
|
[&] () -> ThreadData* {
|
|
if (!validation())
|
|
return nullptr;
|
|
|
|
me->address = address;
|
|
return me;
|
|
});
|
|
|
|
if (!enqueueResult)
|
|
return ParkResult();
|
|
|
|
beforeSleep();
|
|
|
|
bool didGetDequeued;
|
|
{
|
|
MutexLocker locker(me->parkingLock);
|
|
while (me->address && timeout.nowWithSameClock() < timeout) {
|
|
me->parkingCondition.timedWait(
|
|
me->parkingLock, timeout.approximateWallTime());
|
|
|
|
// It's possible for the OS to decide not to wait. If it does that then it will also
|
|
// decide not to release the lock. If there's a bug in the time math, then this could
|
|
// result in a deadlock. Flashing the lock means that at worst it's just a CPU-eating
|
|
// spin.
|
|
me->parkingLock.unlock();
|
|
me->parkingLock.lock();
|
|
}
|
|
ASSERT(!me->address || me->address == address);
|
|
didGetDequeued = !me->address;
|
|
}
|
|
|
|
if (didGetDequeued) {
|
|
// Great! We actually got dequeued rather than the timeout expiring.
|
|
ParkResult result;
|
|
result.wasUnparked = true;
|
|
result.token = me->token;
|
|
return result;
|
|
}
|
|
|
|
// Have to remove ourselves from the queue since we timed out and nobody has dequeued us yet.
|
|
|
|
bool didDequeue = false;
|
|
dequeue(
|
|
address, BucketMode::IgnoreEmpty,
|
|
[&] (ThreadData* element, bool) {
|
|
if (element == me) {
|
|
didDequeue = true;
|
|
return DequeueResult::RemoveAndStop;
|
|
}
|
|
return DequeueResult::Ignore;
|
|
},
|
|
[] (bool) { });
|
|
|
|
// If didDequeue is true, then we dequeued ourselves. This means that we were not unparked.
|
|
// If didDequeue is false, then someone unparked us.
|
|
|
|
RELEASE_ASSERT(!me->nextInQueue);
|
|
|
|
// Make sure that no matter what, me->address is null after this point.
|
|
{
|
|
MutexLocker locker(me->parkingLock);
|
|
if (!didDequeue) {
|
|
// If we did not dequeue ourselves, then someone else did. They will set our address to
|
|
// null. We don't want to proceed until they do this, because otherwise, they may set
|
|
// our address to null in some distant future when we're already trying to wait for
|
|
// other things.
|
|
while (me->address)
|
|
me->parkingCondition.wait(me->parkingLock);
|
|
}
|
|
me->address = nullptr;
|
|
}
|
|
|
|
ParkResult result;
|
|
result.wasUnparked = !didDequeue;
|
|
if (!didDequeue) {
|
|
// If we were unparked then there should be a token.
|
|
result.token = me->token;
|
|
}
|
|
return result;
|
|
}
|
|
|
|
NEVER_INLINE ParkingLot::UnparkResult ParkingLot::unparkOne(const void* address)
|
|
{
|
|
if (verbose)
|
|
dataLog(toString(Thread::current(), ": unparking one.\n"));
|
|
|
|
UnparkResult result;
|
|
|
|
RefPtr<ThreadData> threadData;
|
|
result.mayHaveMoreThreads = dequeue(
|
|
address,
|
|
// Why is this here?
|
|
// FIXME: It seems like this could be IgnoreEmpty, but I switched this to EnsureNonEmpty
|
|
// without explanation in r199760. We need it to use EnsureNonEmpty if we need to perform
|
|
// some operation while holding the bucket lock, which usually goes into the finish func.
|
|
// But if that operation is a no-op, then it's not clear why we need this.
|
|
BucketMode::EnsureNonEmpty,
|
|
[&] (ThreadData* element, bool) {
|
|
if (element->address != address)
|
|
return DequeueResult::Ignore;
|
|
threadData = element;
|
|
result.didUnparkThread = true;
|
|
return DequeueResult::RemoveAndStop;
|
|
},
|
|
[] (bool) { });
|
|
|
|
if (!threadData) {
|
|
ASSERT(!result.didUnparkThread);
|
|
result.mayHaveMoreThreads = false;
|
|
return result;
|
|
}
|
|
|
|
ASSERT(threadData->address);
|
|
|
|
{
|
|
MutexLocker locker(threadData->parkingLock);
|
|
threadData->address = nullptr;
|
|
threadData->token = 0;
|
|
}
|
|
threadData->parkingCondition.signal();
|
|
|
|
return result;
|
|
}
|
|
|
|
NEVER_INLINE void ParkingLot::unparkOneImpl(
|
|
const void* address,
|
|
const ScopedLambda<intptr_t(ParkingLot::UnparkResult)>& callback)
|
|
{
|
|
if (verbose)
|
|
dataLog(toString(Thread::current(), ": unparking one the hard way.\n"));
|
|
|
|
RefPtr<ThreadData> threadData;
|
|
bool timeToBeFair = false;
|
|
dequeue(
|
|
address,
|
|
BucketMode::EnsureNonEmpty,
|
|
[&] (ThreadData* element, bool passedTimeToBeFair) {
|
|
if (element->address != address)
|
|
return DequeueResult::Ignore;
|
|
threadData = element;
|
|
timeToBeFair = passedTimeToBeFair;
|
|
return DequeueResult::RemoveAndStop;
|
|
},
|
|
[&] (bool mayHaveMoreThreads) {
|
|
UnparkResult result;
|
|
result.didUnparkThread = !!threadData;
|
|
result.mayHaveMoreThreads = result.didUnparkThread && mayHaveMoreThreads;
|
|
if (timeToBeFair)
|
|
RELEASE_ASSERT(threadData);
|
|
result.timeToBeFair = timeToBeFair;
|
|
intptr_t token = callback(result);
|
|
if (threadData)
|
|
threadData->token = token;
|
|
});
|
|
|
|
if (!threadData)
|
|
return;
|
|
|
|
ASSERT(threadData->address);
|
|
|
|
{
|
|
MutexLocker locker(threadData->parkingLock);
|
|
threadData->address = nullptr;
|
|
}
|
|
// At this point, the threadData may die. Good thing we have a RefPtr<> on it.
|
|
threadData->parkingCondition.signal();
|
|
}
|
|
|
|
NEVER_INLINE unsigned ParkingLot::unparkCount(const void* address, unsigned count)
|
|
{
|
|
if (!count)
|
|
return 0;
|
|
|
|
if (verbose)
|
|
dataLog(toString(Thread::current(), ": unparking count = ", count, " from ", RawPointer(address), ".\n"));
|
|
|
|
Vector<RefPtr<ThreadData>, 8> threadDatas;
|
|
dequeue(
|
|
address,
|
|
// FIXME: It seems like this ought to be EnsureNonEmpty if we follow what unparkOne() does,
|
|
// but that seems wrong.
|
|
BucketMode::IgnoreEmpty,
|
|
[&] (ThreadData* element, bool) {
|
|
if (verbose)
|
|
dataLog(toString(Thread::current(), ": Observing element with address = ", RawPointer(element->address), "\n"));
|
|
if (element->address != address)
|
|
return DequeueResult::Ignore;
|
|
threadDatas.append(element);
|
|
if (threadDatas.size() == count)
|
|
return DequeueResult::RemoveAndStop;
|
|
return DequeueResult::RemoveAndContinue;
|
|
},
|
|
[] (bool) { });
|
|
|
|
for (RefPtr<ThreadData>& threadData : threadDatas) {
|
|
if (verbose)
|
|
dataLog(toString(Thread::current(), ": unparking ", RawPointer(threadData.get()), " with address ", RawPointer(threadData->address), "\n"));
|
|
ASSERT(threadData->address);
|
|
{
|
|
MutexLocker locker(threadData->parkingLock);
|
|
threadData->address = nullptr;
|
|
}
|
|
threadData->parkingCondition.signal();
|
|
}
|
|
|
|
if (verbose)
|
|
dataLog(toString(Thread::current(), ": done unparking.\n"));
|
|
|
|
return threadDatas.size();
|
|
}
|
|
|
|
NEVER_INLINE void ParkingLot::unparkAll(const void* address)
|
|
{
|
|
unparkCount(address, UINT_MAX);
|
|
}
|
|
|
|
NEVER_INLINE void ParkingLot::forEachImpl(const ScopedLambda<void(Thread&, const void*)>& callback)
|
|
{
|
|
Vector<Bucket*> bucketsToUnlock = lockHashtable();
|
|
|
|
Hashtable* currentHashtable = hashtable.load();
|
|
for (unsigned i = currentHashtable->size; i--;) {
|
|
Bucket* bucket = currentHashtable->data[i].load();
|
|
if (!bucket)
|
|
continue;
|
|
for (ThreadData* currentThreadData = bucket->queueHead; currentThreadData; currentThreadData = currentThreadData->nextInQueue)
|
|
callback(currentThreadData->thread.get(), currentThreadData->address);
|
|
}
|
|
|
|
unlockHashtable(bucketsToUnlock);
|
|
}
|
|
|
|
} // namespace WTF
|
|
|