805 lines
18 KiB
C++
805 lines
18 KiB
C++
/*
|
|
* Copyright 2005, Ingo Weinhold, bonefish@users.sf.net. All rights reserved.
|
|
* Distributed under the terms of the MIT License.
|
|
*/
|
|
|
|
#include <map>
|
|
#include <new>
|
|
#include <set>
|
|
#include <string.h>
|
|
|
|
#include <AutoDeleter.h>
|
|
#include <Autolock.h>
|
|
#include <DataIO.h>
|
|
#include <MessagePrivate.h>
|
|
#include <MessengerPrivate.h>
|
|
#include <OS.h>
|
|
#include <TokenSpace.h>
|
|
#include <util/DoublyLinkedList.h>
|
|
|
|
#include <messaging.h>
|
|
|
|
#include "Debug.h"
|
|
#include "MessageDeliverer.h"
|
|
#include "Referenceable.h"
|
|
|
|
using std::map;
|
|
using std::nothrow;
|
|
using std::set;
|
|
|
|
// sDeliverer -- the singleton instance
|
|
MessageDeliverer *MessageDeliverer::sDeliverer = NULL;
|
|
|
|
static const bigtime_t kRetryDelay = 100000; // 100 ms
|
|
|
|
// per port sanity limits
|
|
static const int32 kMaxMessagesPerPort = 10000;
|
|
static const int32 kMaxDataPerPort = 50 * 1024 * 1024; // 50 MB
|
|
|
|
|
|
// MessagingTargetSet
|
|
|
|
// destructor
|
|
MessagingTargetSet::~MessagingTargetSet()
|
|
{
|
|
}
|
|
|
|
|
|
// #pragma mark -
|
|
|
|
// DefaultMessagingTargetSet
|
|
|
|
// constructor
|
|
DefaultMessagingTargetSet::DefaultMessagingTargetSet(
|
|
const messaging_target *targets, int32 targetCount)
|
|
: MessagingTargetSet(),
|
|
fTargets(targets),
|
|
fTargetCount(targetCount),
|
|
fNextIndex(0)
|
|
{
|
|
}
|
|
|
|
// destructor
|
|
DefaultMessagingTargetSet::~DefaultMessagingTargetSet()
|
|
{
|
|
}
|
|
|
|
// HasNext
|
|
bool
|
|
DefaultMessagingTargetSet::HasNext() const
|
|
{
|
|
return (fNextIndex < fTargetCount);
|
|
}
|
|
|
|
// Next
|
|
bool
|
|
DefaultMessagingTargetSet::Next(port_id &port, int32 &token)
|
|
{
|
|
if (fNextIndex >= fTargetCount)
|
|
return false;
|
|
|
|
port = fTargets[fNextIndex].port;
|
|
token = fTargets[fNextIndex].token;
|
|
fNextIndex++;
|
|
|
|
return true;
|
|
}
|
|
|
|
// Rewind
|
|
void
|
|
DefaultMessagingTargetSet::Rewind()
|
|
{
|
|
fNextIndex = 0;
|
|
}
|
|
|
|
|
|
// #pragma mark -
|
|
|
|
// SingleMessagingTargetSet
|
|
|
|
// constructor
|
|
SingleMessagingTargetSet::SingleMessagingTargetSet(BMessenger target)
|
|
: MessagingTargetSet(),
|
|
fAtBeginning(true)
|
|
{
|
|
BMessenger::Private messengerPrivate(target);
|
|
fPort = messengerPrivate.Port();
|
|
fToken = (messengerPrivate.IsPreferredTarget()
|
|
? B_PREFERRED_TOKEN : messengerPrivate.Token());
|
|
}
|
|
|
|
// constructor
|
|
SingleMessagingTargetSet::SingleMessagingTargetSet(port_id port, int32 token)
|
|
: MessagingTargetSet(),
|
|
fPort(port),
|
|
fToken(token),
|
|
fAtBeginning(true)
|
|
{
|
|
}
|
|
|
|
// destructor
|
|
SingleMessagingTargetSet::~SingleMessagingTargetSet()
|
|
{
|
|
}
|
|
|
|
// HasNext
|
|
bool
|
|
SingleMessagingTargetSet::HasNext() const
|
|
{
|
|
return fAtBeginning;
|
|
}
|
|
|
|
// Next
|
|
bool
|
|
SingleMessagingTargetSet::Next(port_id &port, int32 &token)
|
|
{
|
|
if (!fAtBeginning)
|
|
return false;
|
|
|
|
port = fPort;
|
|
token = fToken;
|
|
fAtBeginning = false;
|
|
|
|
return true;
|
|
}
|
|
|
|
// Rewind
|
|
void
|
|
SingleMessagingTargetSet::Rewind()
|
|
{
|
|
fAtBeginning = true;
|
|
}
|
|
|
|
|
|
// #pragma mark -
|
|
|
|
// Message
|
|
/*! \brief Encapsulates a message to be delivered.
|
|
|
|
Besides the flattened message it also stores the when the message was
|
|
created and when the delivery attempts shall time out.
|
|
*/
|
|
class MessageDeliverer::Message : public BReferenceable {
|
|
public:
|
|
Message(void *data, int32 dataSize, bigtime_t timeout)
|
|
: BReferenceable(),
|
|
fData(data),
|
|
fDataSize(dataSize),
|
|
fCreationTime(system_time()),
|
|
fBusy(false)
|
|
{
|
|
if (B_INFINITE_TIMEOUT - fCreationTime <= timeout)
|
|
fTimeoutTime = B_INFINITE_TIMEOUT;
|
|
else if (timeout <= 0)
|
|
fTimeoutTime = fCreationTime;
|
|
else
|
|
fTimeoutTime = fCreationTime + timeout;
|
|
}
|
|
|
|
~Message()
|
|
{
|
|
free(fData);
|
|
}
|
|
|
|
void *Data() const
|
|
{
|
|
return fData;
|
|
}
|
|
|
|
int32 DataSize() const
|
|
{
|
|
return fDataSize;
|
|
}
|
|
|
|
bigtime_t CreationTime() const
|
|
{
|
|
return fCreationTime;
|
|
}
|
|
|
|
bigtime_t TimeoutTime() const
|
|
{
|
|
return fTimeoutTime;
|
|
}
|
|
|
|
bool HasTimeout() const
|
|
{
|
|
return (fTimeoutTime < B_INFINITE_TIMEOUT);
|
|
}
|
|
|
|
void SetBusy(bool busy)
|
|
{
|
|
fBusy = busy;
|
|
}
|
|
|
|
bool IsBusy() const
|
|
{
|
|
return fBusy;
|
|
}
|
|
|
|
private:
|
|
void *fData;
|
|
int32 fDataSize;
|
|
bigtime_t fCreationTime;
|
|
bigtime_t fTimeoutTime;
|
|
bool fBusy;
|
|
};
|
|
|
|
// TargetMessage
|
|
/*! \brief Encapsulates a Message to be sent to a specific handler.
|
|
|
|
A TargetMessage is always associated with (i.e. queued in) a TargetPort.
|
|
While a Message stores only the message data and some timing info, this
|
|
object adds the token of a the target BHandler.
|
|
|
|
A Message can be referred to by more than one TargetMessage (when
|
|
broadcasting), but a TargetMessage is referred to exactly once, by
|
|
the TargetPort.
|
|
*/
|
|
class MessageDeliverer::TargetMessage
|
|
: public DoublyLinkedListLinkImpl<MessageDeliverer::TargetMessage> {
|
|
public:
|
|
TargetMessage(Message *message, int32 token)
|
|
: fMessage(message),
|
|
fToken(token)
|
|
{
|
|
if (fMessage)
|
|
fMessage->AcquireReference();
|
|
}
|
|
|
|
~TargetMessage()
|
|
{
|
|
if (fMessage)
|
|
fMessage->ReleaseReference();
|
|
}
|
|
|
|
Message *GetMessage() const
|
|
{
|
|
return fMessage;
|
|
}
|
|
|
|
int32 Token() const
|
|
{
|
|
return fToken;
|
|
}
|
|
|
|
private:
|
|
Message *fMessage;
|
|
int32 fToken;
|
|
};
|
|
|
|
// TargetMessageHandle
|
|
/*! \brief A small wrapper for TargetMessage providing a complete order.
|
|
|
|
This class only exists to provide the comparison operators required to
|
|
put a TargetMessage into a set. The order implemented is by ascending by
|
|
timeout time (primary) and by TargetMessage pointer (secondary).
|
|
Hence TargetMessageHandles referring to the same TargetMessage are equal
|
|
(and only those).
|
|
*/
|
|
class MessageDeliverer::TargetMessageHandle {
|
|
public:
|
|
TargetMessageHandle(TargetMessage *message)
|
|
: fMessage(message)
|
|
{
|
|
}
|
|
|
|
TargetMessageHandle(const TargetMessageHandle &other)
|
|
: fMessage(other.fMessage)
|
|
{
|
|
}
|
|
|
|
TargetMessage *GetMessage() const
|
|
{
|
|
return fMessage;
|
|
}
|
|
|
|
TargetMessageHandle &operator=(const TargetMessageHandle &other)
|
|
{
|
|
fMessage = other.fMessage;
|
|
return *this;
|
|
}
|
|
|
|
bool operator==(const TargetMessageHandle &other) const
|
|
{
|
|
return (fMessage == other.fMessage);
|
|
}
|
|
|
|
bool operator!=(const TargetMessageHandle &other) const
|
|
{
|
|
return (fMessage != other.fMessage);
|
|
}
|
|
|
|
bool operator<(const TargetMessageHandle &other) const
|
|
{
|
|
bigtime_t timeout = fMessage->GetMessage()->TimeoutTime();
|
|
bigtime_t otherTimeout = other.fMessage->GetMessage()->TimeoutTime();
|
|
if (timeout < otherTimeout)
|
|
return true;
|
|
if (timeout > otherTimeout)
|
|
return false;
|
|
return (fMessage < other.fMessage);
|
|
}
|
|
|
|
private:
|
|
TargetMessage *fMessage;
|
|
};
|
|
|
|
// TargetPort
|
|
/*! \brief Represents a full target port, queuing the not yet delivered
|
|
messages.
|
|
|
|
A TargetPort internally queues TargetMessages in the order the are to be
|
|
delivered. Furthermore the object maintains an ordered set of
|
|
TargetMessages that can timeout (in ascending order of timeout time), so
|
|
that timed out messages can be dropped easily.
|
|
*/
|
|
class MessageDeliverer::TargetPort {
|
|
public:
|
|
TargetPort(port_id portID)
|
|
: fPortID(portID),
|
|
fMessages(),
|
|
fMessageCount(0),
|
|
fMessageSize(0)
|
|
{
|
|
}
|
|
|
|
~TargetPort()
|
|
{
|
|
while (!fMessages.IsEmpty())
|
|
PopMessage();
|
|
}
|
|
|
|
port_id PortID() const
|
|
{
|
|
return fPortID;
|
|
}
|
|
|
|
status_t PushMessage(Message *message, int32 token)
|
|
{
|
|
PRINT("MessageDeliverer::TargetPort::PushMessage(port: %" B_PRId32 ", %p, %"
|
|
B_PRId32 ")\n", fPortID, message, token);
|
|
// create a target message
|
|
TargetMessage *targetMessage
|
|
= new(nothrow) TargetMessage(message, token);
|
|
if (!targetMessage)
|
|
return B_NO_MEMORY;
|
|
|
|
// push it
|
|
fMessages.Insert(targetMessage);
|
|
fMessageCount++;
|
|
fMessageSize += targetMessage->GetMessage()->DataSize();
|
|
|
|
// add it to the timeoutable messages, if it has a timeout
|
|
if (message->HasTimeout())
|
|
fTimeoutableMessages.insert(targetMessage);
|
|
|
|
_EnforceLimits();
|
|
|
|
return B_OK;
|
|
}
|
|
|
|
Message *PeekMessage(int32 &token) const
|
|
{
|
|
if (!fMessages.Head())
|
|
return NULL;
|
|
|
|
token = fMessages.Head()->Token();
|
|
return fMessages.Head()->GetMessage();
|
|
}
|
|
|
|
void PopMessage()
|
|
{
|
|
if (fMessages.Head()) {
|
|
PRINT("MessageDeliverer::TargetPort::PopMessage(): port: %" B_PRId32 ", %p\n",
|
|
fPortID, fMessages.Head()->GetMessage());
|
|
_RemoveMessage(fMessages.Head());
|
|
}
|
|
}
|
|
|
|
void DropTimedOutMessages()
|
|
{
|
|
bigtime_t now = system_time();
|
|
|
|
while (fTimeoutableMessages.begin() != fTimeoutableMessages.end()) {
|
|
TargetMessage *message = fTimeoutableMessages.begin()->GetMessage();
|
|
if (message->GetMessage()->TimeoutTime() > now)
|
|
break;
|
|
|
|
PRINT("MessageDeliverer::TargetPort::DropTimedOutMessages(): port: %" B_PRId32
|
|
": message %p timed out\n", fPortID, message->GetMessage());
|
|
_RemoveMessage(message);
|
|
}
|
|
}
|
|
|
|
bool IsEmpty() const
|
|
{
|
|
return fMessages.IsEmpty();
|
|
}
|
|
|
|
private:
|
|
void _RemoveMessage(TargetMessage *message)
|
|
{
|
|
fMessages.Remove(message);
|
|
fMessageCount--;
|
|
fMessageSize -= message->GetMessage()->DataSize();
|
|
|
|
if (message->GetMessage()->HasTimeout())
|
|
fTimeoutableMessages.erase(message);
|
|
|
|
delete message;
|
|
}
|
|
|
|
void _EnforceLimits()
|
|
{
|
|
// message count
|
|
while (fMessageCount > kMaxMessagesPerPort) {
|
|
PRINT("MessageDeliverer::TargetPort::_EnforceLimits(): port: %" B_PRId32
|
|
": hit maximum message count limit.\n", fPortID);
|
|
PopMessage();
|
|
}
|
|
|
|
// message size
|
|
while (fMessageSize > kMaxDataPerPort) {
|
|
PRINT("MessageDeliverer::TargetPort::_EnforceLimits(): port: %" B_PRId32
|
|
": hit maximum message size limit.\n", fPortID);
|
|
PopMessage();
|
|
}
|
|
}
|
|
|
|
typedef DoublyLinkedList<TargetMessage> MessageList;
|
|
|
|
port_id fPortID;
|
|
MessageList fMessages;
|
|
int32 fMessageCount;
|
|
int32 fMessageSize;
|
|
set<TargetMessageHandle> fTimeoutableMessages;
|
|
};
|
|
|
|
// TargetPortMap
|
|
struct MessageDeliverer::TargetPortMap : public map<port_id, TargetPort*> {
|
|
};
|
|
|
|
|
|
// #pragma mark -
|
|
|
|
/*! \class MessageDeliverer
|
|
\brief Service for delivering messages, which retries the delivery as long
|
|
as the target port is full.
|
|
|
|
For the user of the service only the MessageDeliverer::DeliverMessage()
|
|
will be of interest. Some of them allow broadcasting a message to several
|
|
recepients.
|
|
|
|
The class maintains a TargetPort for each target port which was full at the
|
|
time a message was to be delivered to it. A TargetPort has a queue of
|
|
undelivered messages. A separate worker thread retries periodically to send
|
|
the yet undelivered messages to the respective target ports.
|
|
*/
|
|
|
|
// constructor
|
|
MessageDeliverer::MessageDeliverer()
|
|
: fLock("message deliverer"),
|
|
fTargetPorts(NULL),
|
|
fDelivererThread(-1),
|
|
fTerminating(false)
|
|
{
|
|
}
|
|
|
|
// destructor
|
|
MessageDeliverer::~MessageDeliverer()
|
|
{
|
|
fTerminating = true;
|
|
|
|
if (fDelivererThread >= 0) {
|
|
int32 result;
|
|
wait_for_thread(fDelivererThread, &result);
|
|
}
|
|
|
|
delete fTargetPorts;
|
|
}
|
|
|
|
// Init
|
|
status_t
|
|
MessageDeliverer::Init()
|
|
{
|
|
// create the target port map
|
|
fTargetPorts = new(nothrow) TargetPortMap;
|
|
if (!fTargetPorts)
|
|
return B_NO_MEMORY;
|
|
|
|
// spawn the deliverer thread
|
|
fDelivererThread = spawn_thread(MessageDeliverer::_DelivererThreadEntry,
|
|
"message deliverer", B_NORMAL_PRIORITY + 1, this);
|
|
if (fDelivererThread < 0)
|
|
return fDelivererThread;
|
|
|
|
// resume the deliverer thread
|
|
resume_thread(fDelivererThread);
|
|
|
|
return B_OK;
|
|
}
|
|
|
|
// CreateDefault
|
|
status_t
|
|
MessageDeliverer::CreateDefault()
|
|
{
|
|
if (sDeliverer)
|
|
return B_OK;
|
|
|
|
// create the deliverer
|
|
MessageDeliverer *deliverer = new(nothrow) MessageDeliverer;
|
|
if (!deliverer)
|
|
return B_NO_MEMORY;
|
|
|
|
// init it
|
|
status_t error = deliverer->Init();
|
|
if (error != B_OK) {
|
|
delete deliverer;
|
|
return error;
|
|
}
|
|
|
|
sDeliverer = deliverer;
|
|
return B_OK;
|
|
}
|
|
|
|
// DeleteDefault
|
|
void
|
|
MessageDeliverer::DeleteDefault()
|
|
{
|
|
if (sDeliverer) {
|
|
delete sDeliverer;
|
|
sDeliverer = NULL;
|
|
}
|
|
}
|
|
|
|
// Default
|
|
MessageDeliverer *
|
|
MessageDeliverer::Default()
|
|
{
|
|
return sDeliverer;
|
|
}
|
|
|
|
// DeliverMessage
|
|
/*! \brief Delivers a message to the supplied target.
|
|
|
|
The method tries to send the message right now (if there are not already
|
|
messages pending for the target port). If that fails due to a full target
|
|
port, the message is queued for later delivery.
|
|
|
|
\param message The message to be delivered.
|
|
\param target A BMessenger identifying the delivery target.
|
|
\param timeout If given, the message will be dropped, when it couldn't be
|
|
delivered after this amount of microseconds.
|
|
\return
|
|
- \c B_OK, if sending the message succeeded or if the target port was
|
|
full and the message has been queued,
|
|
- another error code otherwise.
|
|
*/
|
|
status_t
|
|
MessageDeliverer::DeliverMessage(BMessage *message, BMessenger target,
|
|
bigtime_t timeout)
|
|
{
|
|
SingleMessagingTargetSet set(target);
|
|
return DeliverMessage(message, set, timeout);
|
|
}
|
|
|
|
// DeliverMessage
|
|
/*! \brief Delivers a message to the supplied targets.
|
|
|
|
The method tries to send the message right now to each of the given targets
|
|
(if there are not already messages pending for a target port). If that
|
|
fails due to a full target port, the message is queued for later delivery.
|
|
|
|
\param message The message to be delivered.
|
|
\param targets MessagingTargetSet providing the the delivery targets.
|
|
\param timeout If given, the message will be dropped, when it couldn't be
|
|
delivered after this amount of microseconds.
|
|
\return
|
|
- \c B_OK, if for each of the given targets sending the message succeeded
|
|
or if the target port was full and the message has been queued,
|
|
- another error code otherwise.
|
|
*/
|
|
status_t
|
|
MessageDeliverer::DeliverMessage(BMessage *message, MessagingTargetSet &targets,
|
|
bigtime_t timeout)
|
|
{
|
|
if (!message)
|
|
return B_BAD_VALUE;
|
|
|
|
// flatten the message
|
|
BMallocIO mallocIO;
|
|
status_t error = message->Flatten(&mallocIO, NULL);
|
|
if (error < B_OK)
|
|
return error;
|
|
|
|
return DeliverMessage(mallocIO.Buffer(), mallocIO.BufferLength(), targets,
|
|
timeout);
|
|
}
|
|
|
|
// DeliverMessage
|
|
/*! \brief Delivers a flattened message to the supplied targets.
|
|
|
|
The method tries to send the message right now to each of the given targets
|
|
(if there are not already messages pending for a target port). If that
|
|
fails due to a full target port, the message is queued for later delivery.
|
|
|
|
\param message The flattened message to be delivered. This may be a
|
|
flattened BMessage or KMessage.
|
|
\param messageSize The size of the flattened message buffer.
|
|
\param targets MessagingTargetSet providing the the delivery targets.
|
|
\param timeout If given, the message will be dropped, when it couldn't be
|
|
delivered after this amount of microseconds.
|
|
\return
|
|
- \c B_OK, if for each of the given targets sending the message succeeded
|
|
or if the target port was full and the message has been queued,
|
|
- another error code otherwise.
|
|
*/
|
|
status_t
|
|
MessageDeliverer::DeliverMessage(const void *messageData, int32 messageSize,
|
|
MessagingTargetSet &targets, bigtime_t timeout)
|
|
{
|
|
if (!messageData || messageSize <= 0)
|
|
return B_BAD_VALUE;
|
|
|
|
// clone the buffer
|
|
void *data = malloc(messageSize);
|
|
if (!data)
|
|
return B_NO_MEMORY;
|
|
memcpy(data, messageData, messageSize);
|
|
|
|
// create a Message
|
|
Message *message = new(nothrow) Message(data, messageSize, timeout);
|
|
if (!message) {
|
|
free(data);
|
|
return B_NO_MEMORY;
|
|
}
|
|
BReference<Message> _(message, true);
|
|
|
|
// add the message to the respective target ports
|
|
BAutolock locker(fLock);
|
|
for (int32 targetIndex = 0; targets.HasNext(); targetIndex++) {
|
|
port_id portID;
|
|
int32 token;
|
|
targets.Next(portID, token);
|
|
|
|
// get the target port
|
|
TargetPort *port = _GetTargetPort(portID, true);
|
|
if (!port)
|
|
return B_NO_MEMORY;
|
|
|
|
// try sending the message, if there are no queued messages yet
|
|
if (port->IsEmpty()) {
|
|
status_t error = _SendMessage(message, portID, token);
|
|
// if the message was delivered OK, we're done with the target
|
|
if (error == B_OK) {
|
|
_PutTargetPort(port);
|
|
continue;
|
|
}
|
|
|
|
// if the port is not full, but an error occurred, we skip this target
|
|
if (error != B_WOULD_BLOCK) {
|
|
_PutTargetPort(port);
|
|
if (targetIndex == 0 && !targets.HasNext())
|
|
return error;
|
|
continue;
|
|
}
|
|
}
|
|
|
|
// add the message
|
|
status_t error = port->PushMessage(message, token);
|
|
_PutTargetPort(port);
|
|
if (error != B_OK)
|
|
return error;
|
|
}
|
|
|
|
return B_OK;
|
|
}
|
|
|
|
// _GetTargetPort
|
|
MessageDeliverer::TargetPort *
|
|
MessageDeliverer::_GetTargetPort(port_id portID, bool create)
|
|
{
|
|
// get the port from the map
|
|
TargetPortMap::iterator it = fTargetPorts->find(portID);
|
|
if (it != fTargetPorts->end())
|
|
return it->second;
|
|
|
|
if (!create)
|
|
return NULL;
|
|
|
|
// create a port
|
|
TargetPort *port = new(nothrow) TargetPort(portID);
|
|
if (!port)
|
|
return NULL;
|
|
(*fTargetPorts)[portID] = port;
|
|
|
|
return port;
|
|
}
|
|
|
|
// _PutTargetPort
|
|
void
|
|
MessageDeliverer::_PutTargetPort(TargetPort *port)
|
|
{
|
|
if (!port)
|
|
return;
|
|
|
|
if (port->IsEmpty()) {
|
|
fTargetPorts->erase(port->PortID());
|
|
delete port;
|
|
}
|
|
}
|
|
|
|
// _SendMessage
|
|
status_t
|
|
MessageDeliverer::_SendMessage(Message *message, port_id portID, int32 token)
|
|
{
|
|
status_t error = BMessage::Private::SendFlattenedMessage(message->Data(),
|
|
message->DataSize(), portID, token, 0);
|
|
//PRINT("MessageDeliverer::_SendMessage(%p, port: %ld, token: %ld): %lx\n",
|
|
//message, portID, token, error);
|
|
return error;
|
|
}
|
|
|
|
// _DelivererThreadEntry
|
|
int32
|
|
MessageDeliverer::_DelivererThreadEntry(void *data)
|
|
{
|
|
return ((MessageDeliverer*)data)->_DelivererThread();
|
|
}
|
|
|
|
// _DelivererThread
|
|
int32
|
|
MessageDeliverer::_DelivererThread()
|
|
{
|
|
while (!fTerminating) {
|
|
snooze(kRetryDelay);
|
|
if (fTerminating)
|
|
break;
|
|
|
|
// iterate through all target ports and try sending the messages
|
|
BAutolock _(fLock);
|
|
for (TargetPortMap::iterator it = fTargetPorts->begin();
|
|
it != fTargetPorts->end();) {
|
|
TargetPort *port = it->second;
|
|
bool portError = false;
|
|
|
|
port->DropTimedOutMessages();
|
|
|
|
// try sending all messages
|
|
int32 token;
|
|
while (Message *message = port->PeekMessage(token)) {
|
|
status_t error = B_OK;
|
|
// if (message->TimeoutTime() > system_time()) {
|
|
error = _SendMessage(message, port->PortID(), token);
|
|
// } else {
|
|
// // timeout, drop message
|
|
// PRINT("MessageDeliverer::_DelivererThread(): port %ld, "
|
|
// "message %p timed out\n", port->PortID(), message);
|
|
// }
|
|
|
|
if (error == B_OK) {
|
|
port->PopMessage();
|
|
} else if (error == B_WOULD_BLOCK) {
|
|
// no luck yet -- port is still full
|
|
break;
|
|
} else {
|
|
// unexpected error -- probably the port is gone
|
|
portError = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
// next port
|
|
if (portError || port->IsEmpty()) {
|
|
TargetPortMap::iterator oldIt = it;
|
|
++it;
|
|
delete port;
|
|
fTargetPorts->erase(oldIt);
|
|
} else
|
|
++it;
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|