/* * Copyright (C) 2015-2016 Apple Inc. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #include "config.h" #include #include #include #include #include #include #include #include #include #include namespace WTF { namespace { static constexpr bool verbose = false; struct ThreadData : public ThreadSafeRefCounted { WTF_MAKE_FAST_ALLOCATED; public: ThreadData(); ~ThreadData(); Ref thread; Mutex parkingLock; ThreadCondition parkingCondition; const void* address { nullptr }; ThreadData* nextInQueue { nullptr }; intptr_t token { 0 }; }; enum class DequeueResult { Ignore, RemoveAndContinue, RemoveAndStop }; struct Bucket { WTF_MAKE_FAST_ALLOCATED; public: Bucket() : random(static_cast(bitwise_cast(this))) // Cannot use default seed since that recurses into Lock. { } void enqueue(ThreadData* data) { if (verbose) dataLog(toString(Thread::current(), ": enqueueing ", RawPointer(data), " with address = ", RawPointer(data->address), " onto ", RawPointer(this), "\n")); ASSERT(data->address); ASSERT(!data->nextInQueue); if (queueTail) { queueTail->nextInQueue = data; queueTail = data; return; } queueHead = data; queueTail = data; } template void genericDequeue(const Functor& functor) { if (verbose) dataLog(toString(Thread::current(), ": dequeueing from bucket at ", RawPointer(this), "\n")); if (!queueHead) { if (verbose) dataLog(toString(Thread::current(), ": empty.\n")); return; } // This loop is a very clever abomination. The induction variables are the pointer to the // pointer to the current node, and the pointer to the previous node. This gives us everything // we need to both proceed forward to the next node, and to remove nodes while maintaining the // queueHead/queueTail and all of the nextInQueue links. For example, when we are at the head // element, then removal means rewiring queueHead, and if it was also equal to queueTail, then // we'd want queueTail to be set to nullptr. This works because: // // currentPtr == &queueHead // previous == nullptr // // We remove by setting *currentPtr = (*currentPtr)->nextInQueue, i.e. changing the pointer // that used to point to this node to instead point to this node's successor. Another example: // if we were at the second node in the queue, then we'd have: // // currentPtr == &queueHead->nextInQueue // previous == queueHead // // If this node is not equal to queueTail, then removing it simply means making // queueHead->nextInQueue point to queueHead->nextInQueue->nextInQueue (which the algorithm // achieves by mutating *currentPtr). If this node is equal to queueTail, then we want to set // queueTail to previous, which in this case is queueHead - thus making the queue look like a // proper one-element queue with queueHead == queueTail. bool shouldContinue = true; ThreadData** currentPtr = &queueHead; ThreadData* previous = nullptr; MonotonicTime time = MonotonicTime::now(); bool timeToBeFair = false; if (time > nextFairTime) timeToBeFair = true; bool didDequeue = false; while (shouldContinue) { ThreadData* current = *currentPtr; if (verbose) dataLog(toString(Thread::current(), ": got thread ", RawPointer(current), "\n")); if (!current) break; DequeueResult result = functor(current, timeToBeFair); switch (result) { case DequeueResult::Ignore: if (verbose) dataLog(toString(Thread::current(), ": currentPtr = ", RawPointer(currentPtr), ", *currentPtr = ", RawPointer(*currentPtr), "\n")); previous = current; currentPtr = &(*currentPtr)->nextInQueue; break; case DequeueResult::RemoveAndStop: shouldContinue = false; FALLTHROUGH; case DequeueResult::RemoveAndContinue: if (verbose) dataLog(toString(Thread::current(), ": dequeueing ", RawPointer(current), " from ", RawPointer(this), "\n")); if (current == queueTail) queueTail = previous; didDequeue = true; *currentPtr = current->nextInQueue; current->nextInQueue = nullptr; break; } } if (timeToBeFair && didDequeue) nextFairTime = time + Seconds::fromMilliseconds(random.get()); ASSERT(!!queueHead == !!queueTail); } ThreadData* dequeue() { ThreadData* result = nullptr; genericDequeue( [&] (ThreadData* element, bool) -> DequeueResult { result = element; return DequeueResult::RemoveAndStop; }); return result; } ThreadData* queueHead { nullptr }; ThreadData* queueTail { nullptr }; // This lock protects the entire bucket. Thou shall not make changes to Bucket without holding // this lock. WordLock lock; MonotonicTime nextFairTime; WeakRandom random; // Put some distane between buckets in memory. This is one of several mitigations against false // sharing. char padding[64]; }; struct Hashtable; // We track all allocated hashtables so that hashtable resizing doesn't anger leak detectors. Vector* hashtables; WordLock hashtablesLock; struct Hashtable { unsigned size; Atomic data[1]; static Hashtable* create(unsigned size) { ASSERT(size >= 1); Hashtable* result = static_cast( fastZeroedMalloc(sizeof(Hashtable) + sizeof(Atomic) * (size - 1))); result->size = size; { // This is not fast and it's not data-access parallel, but that's fine, because // hashtable resizing is guaranteed to be rare and it will never happen in steady // state. WordLockHolder locker(hashtablesLock); if (!hashtables) hashtables = new Vector(); hashtables->append(result); } return result; } static void destroy(Hashtable* hashtable) { { // This is not fast, but that's OK. See comment in create(). WordLockHolder locker(hashtablesLock); hashtables->removeFirst(hashtable); } fastFree(hashtable); } }; Atomic hashtable; Atomic numThreads; // With 64 bytes of padding per bucket, assuming a hashtable is fully populated with buckets, the // memory usage per thread will still be less than 1KB. const unsigned maxLoadFactor = 3; const unsigned growthFactor = 2; unsigned hashAddress(const void* address) { return WTF::PtrHash::hash(address); } Hashtable* ensureHashtable() { for (;;) { Hashtable* currentHashtable = hashtable.load(); if (currentHashtable) return currentHashtable; if (!currentHashtable) { currentHashtable = Hashtable::create(maxLoadFactor); if (hashtable.compareExchangeWeak(nullptr, currentHashtable)) { if (verbose) dataLog(toString(Thread::current(), ": created initial hashtable ", RawPointer(currentHashtable), "\n")); return currentHashtable; } Hashtable::destroy(currentHashtable); } } } // Locks the hashtable. This reloops in case of rehashing, so the current hashtable may be different // after this returns than when you called it. Guarantees that there is a hashtable. This is pretty // slow and not scalable, so it's only used during thread creation and for debugging/testing. Vector lockHashtable() { for (;;) { Hashtable* currentHashtable = ensureHashtable(); ASSERT(currentHashtable); // Now find all of the buckets. This makes sure that the hashtable is full of buckets so that // we can lock all of the buckets, not just the ones that are materialized. Vector buckets; for (unsigned i = currentHashtable->size; i--;) { Atomic& bucketPointer = currentHashtable->data[i]; for (;;) { Bucket* bucket = bucketPointer.load(); if (!bucket) { bucket = new Bucket(); if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) { delete bucket; continue; } } buckets.append(bucket); break; } } // Now lock the buckets in the right order. std::sort(buckets.begin(), buckets.end()); for (Bucket* bucket : buckets) bucket->lock.lock(); // If the hashtable didn't change (wasn't rehashed) while we were locking it, then we own it // now. if (hashtable.load() == currentHashtable) return buckets; // The hashtable rehashed. Unlock everything and try again. for (Bucket* bucket : buckets) bucket->lock.unlock(); } } void unlockHashtable(const Vector& buckets) { for (Bucket* bucket : buckets) bucket->lock.unlock(); } // Rehash the hashtable to handle numThreads threads. void ensureHashtableSize(unsigned numThreads) { // We try to ensure that the size of the hashtable used for thread queues is always large enough // to avoid collisions. So, since we started a new thread, we may need to increase the size of the // hashtable. This does just that. Note that we never free the old spine, since we never lock // around spine accesses (i.e. the "hashtable" global variable). // First do a fast check to see if rehashing is needed. Hashtable* oldHashtable = hashtable.load(); if (oldHashtable && static_cast(oldHashtable->size) / static_cast(numThreads) >= maxLoadFactor) { if (verbose) dataLog(toString(Thread::current(), ": no need to rehash because ", oldHashtable->size, " / ", numThreads, " >= ", maxLoadFactor, "\n")); return; } // Seems like we *might* have to rehash, so lock the hashtable and try again. Vector bucketsToUnlock = lockHashtable(); // Check again, since the hashtable could have rehashed while we were locking it. Also, // lockHashtable() creates an initial hashtable for us. oldHashtable = hashtable.load(); RELEASE_ASSERT(oldHashtable); if (static_cast(oldHashtable->size) / static_cast(numThreads) >= maxLoadFactor) { if (verbose) dataLog(toString(Thread::current(), ": after locking, no need to rehash because ", oldHashtable->size, " / ", numThreads, " >= ", maxLoadFactor, "\n")); unlockHashtable(bucketsToUnlock); return; } Vector reusableBuckets = bucketsToUnlock; // OK, now we resize. First we gather all thread datas from the old hashtable. These thread datas // are placed into the vector in queue order. Vector threadDatas; for (Bucket* bucket : reusableBuckets) { while (ThreadData* threadData = bucket->dequeue()) threadDatas.append(threadData); } unsigned newSize = numThreads * growthFactor * maxLoadFactor; RELEASE_ASSERT(newSize > oldHashtable->size); Hashtable* newHashtable = Hashtable::create(newSize); if (verbose) dataLog(toString(Thread::current(), ": created new hashtable: ", RawPointer(newHashtable), "\n")); for (ThreadData* threadData : threadDatas) { if (verbose) dataLog(toString(Thread::current(), ": rehashing thread data ", RawPointer(threadData), " with address = ", RawPointer(threadData->address), "\n")); unsigned hash = hashAddress(threadData->address); unsigned index = hash % newHashtable->size; if (verbose) dataLog(toString(Thread::current(), ": index = ", index, "\n")); Bucket* bucket = newHashtable->data[index].load(); if (!bucket) { if (reusableBuckets.isEmpty()) bucket = new Bucket(); else bucket = reusableBuckets.takeLast(); newHashtable->data[index].store(bucket); } bucket->enqueue(threadData); } // At this point there may be some buckets left unreused. This could easily happen if the // number of enqueued threads right now is low but the high watermark of the number of threads // enqueued was high. We place these buckets into the hashtable basically at random, just to // make sure we don't leak them. for (unsigned i = 0; i < newHashtable->size && !reusableBuckets.isEmpty(); ++i) { Atomic& bucketPtr = newHashtable->data[i]; if (bucketPtr.load()) continue; bucketPtr.store(reusableBuckets.takeLast()); } // Since we increased the size of the hashtable, we should have exhausted our preallocated // buckets by now. ASSERT(reusableBuckets.isEmpty()); // OK, right now the old hashtable is locked up and the new hashtable is ready to rock and // roll. After we install the new hashtable, we can release all bucket locks. bool result = hashtable.compareExchangeStrong(oldHashtable, newHashtable) == oldHashtable; RELEASE_ASSERT(result); unlockHashtable(bucketsToUnlock); } ThreadData::ThreadData() : thread(Thread::current()) { unsigned currentNumThreads; for (;;) { unsigned oldNumThreads = numThreads.load(); currentNumThreads = oldNumThreads + 1; if (numThreads.compareExchangeWeak(oldNumThreads, currentNumThreads)) break; } ensureHashtableSize(currentNumThreads); } ThreadData::~ThreadData() { for (;;) { unsigned oldNumThreads = numThreads.load(); if (numThreads.compareExchangeWeak(oldNumThreads, oldNumThreads - 1)) break; } } ThreadData* myThreadData() { static ThreadSpecific, CanBeGCThread::True>* threadData; static std::once_flag initializeOnce; std::call_once( initializeOnce, [] { threadData = new ThreadSpecific, CanBeGCThread::True>(); }); RefPtr& result = **threadData; if (!result) result = adoptRef(new ThreadData()); return result.get(); } template bool enqueue(const void* address, const Functor& functor) { unsigned hash = hashAddress(address); for (;;) { Hashtable* myHashtable = ensureHashtable(); unsigned index = hash % myHashtable->size; Atomic& bucketPointer = myHashtable->data[index]; Bucket* bucket; for (;;) { bucket = bucketPointer.load(); if (!bucket) { bucket = new Bucket(); if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) { delete bucket; continue; } } break; } if (verbose) dataLog(toString(Thread::current(), ": enqueueing onto bucket ", RawPointer(bucket), " with index ", index, " for address ", RawPointer(address), " with hash ", hash, "\n")); bucket->lock.lock(); // At this point the hashtable could have rehashed under us. if (hashtable.load() != myHashtable) { bucket->lock.unlock(); continue; } ThreadData* threadData = functor(); bool result; if (threadData) { if (verbose) dataLog(toString(Thread::current(), ": proceeding to enqueue ", RawPointer(threadData), "\n")); bucket->enqueue(threadData); result = true; } else result = false; bucket->lock.unlock(); return result; } } enum class BucketMode { EnsureNonEmpty, IgnoreEmpty }; template bool dequeue( const void* address, BucketMode bucketMode, const DequeueFunctor& dequeueFunctor, const FinishFunctor& finishFunctor) { unsigned hash = hashAddress(address); for (;;) { Hashtable* myHashtable = ensureHashtable(); unsigned index = hash % myHashtable->size; Atomic& bucketPointer = myHashtable->data[index]; Bucket* bucket = bucketPointer.load(); if (!bucket) { if (bucketMode == BucketMode::IgnoreEmpty) return false; for (;;) { bucket = bucketPointer.load(); if (!bucket) { bucket = new Bucket(); if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) { delete bucket; continue; } } break; } } bucket->lock.lock(); // At this point the hashtable could have rehashed under us. if (hashtable.load() != myHashtable) { bucket->lock.unlock(); continue; } bucket->genericDequeue(dequeueFunctor); bool result = !!bucket->queueHead; finishFunctor(result); bucket->lock.unlock(); return result; } } } // anonymous namespace NEVER_INLINE ParkingLot::ParkResult ParkingLot::parkConditionallyImpl( const void* address, const ScopedLambda& validation, const ScopedLambda& beforeSleep, const TimeWithDynamicClockType& timeout) { if (verbose) dataLog(toString(Thread::current(), ": parking.\n")); ThreadData* me = myThreadData(); me->token = 0; // Guard against someone calling parkConditionally() recursively from beforeSleep(). RELEASE_ASSERT(!me->address); bool enqueueResult = enqueue( address, [&] () -> ThreadData* { if (!validation()) return nullptr; me->address = address; return me; }); if (!enqueueResult) return ParkResult(); beforeSleep(); bool didGetDequeued; { MutexLocker locker(me->parkingLock); while (me->address && timeout.nowWithSameClock() < timeout) { me->parkingCondition.timedWait( me->parkingLock, timeout.approximateWallTime()); // It's possible for the OS to decide not to wait. If it does that then it will also // decide not to release the lock. If there's a bug in the time math, then this could // result in a deadlock. Flashing the lock means that at worst it's just a CPU-eating // spin. me->parkingLock.unlock(); me->parkingLock.lock(); } ASSERT(!me->address || me->address == address); didGetDequeued = !me->address; } if (didGetDequeued) { // Great! We actually got dequeued rather than the timeout expiring. ParkResult result; result.wasUnparked = true; result.token = me->token; return result; } // Have to remove ourselves from the queue since we timed out and nobody has dequeued us yet. bool didDequeue = false; dequeue( address, BucketMode::IgnoreEmpty, [&] (ThreadData* element, bool) { if (element == me) { didDequeue = true; return DequeueResult::RemoveAndStop; } return DequeueResult::Ignore; }, [] (bool) { }); // If didDequeue is true, then we dequeued ourselves. This means that we were not unparked. // If didDequeue is false, then someone unparked us. RELEASE_ASSERT(!me->nextInQueue); // Make sure that no matter what, me->address is null after this point. { MutexLocker locker(me->parkingLock); if (!didDequeue) { // If we did not dequeue ourselves, then someone else did. They will set our address to // null. We don't want to proceed until they do this, because otherwise, they may set // our address to null in some distant future when we're already trying to wait for // other things. while (me->address) me->parkingCondition.wait(me->parkingLock); } me->address = nullptr; } ParkResult result; result.wasUnparked = !didDequeue; if (!didDequeue) { // If we were unparked then there should be a token. result.token = me->token; } return result; } NEVER_INLINE ParkingLot::UnparkResult ParkingLot::unparkOne(const void* address) { if (verbose) dataLog(toString(Thread::current(), ": unparking one.\n")); UnparkResult result; RefPtr threadData; result.mayHaveMoreThreads = dequeue( address, // Why is this here? // FIXME: It seems like this could be IgnoreEmpty, but I switched this to EnsureNonEmpty // without explanation in r199760. We need it to use EnsureNonEmpty if we need to perform // some operation while holding the bucket lock, which usually goes into the finish func. // But if that operation is a no-op, then it's not clear why we need this. BucketMode::EnsureNonEmpty, [&] (ThreadData* element, bool) { if (element->address != address) return DequeueResult::Ignore; threadData = element; result.didUnparkThread = true; return DequeueResult::RemoveAndStop; }, [] (bool) { }); if (!threadData) { ASSERT(!result.didUnparkThread); result.mayHaveMoreThreads = false; return result; } ASSERT(threadData->address); { MutexLocker locker(threadData->parkingLock); threadData->address = nullptr; threadData->token = 0; } threadData->parkingCondition.signal(); return result; } NEVER_INLINE void ParkingLot::unparkOneImpl( const void* address, const ScopedLambda& callback) { if (verbose) dataLog(toString(Thread::current(), ": unparking one the hard way.\n")); RefPtr threadData; bool timeToBeFair = false; dequeue( address, BucketMode::EnsureNonEmpty, [&] (ThreadData* element, bool passedTimeToBeFair) { if (element->address != address) return DequeueResult::Ignore; threadData = element; timeToBeFair = passedTimeToBeFair; return DequeueResult::RemoveAndStop; }, [&] (bool mayHaveMoreThreads) { UnparkResult result; result.didUnparkThread = !!threadData; result.mayHaveMoreThreads = result.didUnparkThread && mayHaveMoreThreads; if (timeToBeFair) RELEASE_ASSERT(threadData); result.timeToBeFair = timeToBeFair; intptr_t token = callback(result); if (threadData) threadData->token = token; }); if (!threadData) return; ASSERT(threadData->address); { MutexLocker locker(threadData->parkingLock); threadData->address = nullptr; } // At this point, the threadData may die. Good thing we have a RefPtr<> on it. threadData->parkingCondition.signal(); } NEVER_INLINE unsigned ParkingLot::unparkCount(const void* address, unsigned count) { if (!count) return 0; if (verbose) dataLog(toString(Thread::current(), ": unparking count = ", count, " from ", RawPointer(address), ".\n")); Vector, 8> threadDatas; dequeue( address, // FIXME: It seems like this ought to be EnsureNonEmpty if we follow what unparkOne() does, // but that seems wrong. BucketMode::IgnoreEmpty, [&] (ThreadData* element, bool) { if (verbose) dataLog(toString(Thread::current(), ": Observing element with address = ", RawPointer(element->address), "\n")); if (element->address != address) return DequeueResult::Ignore; threadDatas.append(element); if (threadDatas.size() == count) return DequeueResult::RemoveAndStop; return DequeueResult::RemoveAndContinue; }, [] (bool) { }); for (RefPtr& threadData : threadDatas) { if (verbose) dataLog(toString(Thread::current(), ": unparking ", RawPointer(threadData.get()), " with address ", RawPointer(threadData->address), "\n")); ASSERT(threadData->address); { MutexLocker locker(threadData->parkingLock); threadData->address = nullptr; } threadData->parkingCondition.signal(); } if (verbose) dataLog(toString(Thread::current(), ": done unparking.\n")); return threadDatas.size(); } NEVER_INLINE void ParkingLot::unparkAll(const void* address) { unparkCount(address, UINT_MAX); } NEVER_INLINE void ParkingLot::forEachImpl(const ScopedLambda& callback) { Vector bucketsToUnlock = lockHashtable(); Hashtable* currentHashtable = hashtable.load(); for (unsigned i = currentHashtable->size; i--;) { Bucket* bucket = currentHashtable->data[i].load(); if (!bucket) continue; for (ThreadData* currentThreadData = bucket->queueHead; currentThreadData; currentThreadData = currentThreadData->nextInQueue) callback(currentThreadData->thread.get(), currentThreadData->address); } unlockHashtable(bucketsToUnlock); } } // namespace WTF