haiku/src/servers/app/DelayedMessage.cpp

874 lines
17 KiB
C++

/*
* Copyright 2015, Haiku.
* Distributed under the terms of the MIT License.
*
* Authors:
* Joseph Groover <looncraz@looncraz.net>
*/
#include "DelayedMessage.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <Autolock.h>
#include <String.h>
#include <LinkSender.h>
#include <ServerProtocol.h>
// DelayedMessageSender constants
static const int32 kWakeupMessage = AS_LAST_CODE + 2048;
static const int32 kExitMessage = kWakeupMessage + 1;
static const char* kName = "DMT is here for you, eventually...";
static int32 kPriority = B_URGENT_DISPLAY_PRIORITY;
static int32 kPortCapacity = 10;
//! Data attachment structure.
struct Attachment {
Attachment(const void* data, size_t size);
~Attachment();
const void* constData;
void* data;
size_t size;
};
typedef BObjectList<Attachment> AttachmentList;
/*! \class ScheduledMessage
\brief Responsible for sending of delayed message.
*/
class ScheduledMessage {
public:
ScheduledMessage(DelayedMessage& message);
~ScheduledMessage();
int32 CountTargets() const;
void Finalize();
bigtime_t ScheduledTime() const;
int32 SendMessage();
bool IsValid() const;
bool Merge(DelayedMessage& message);
status_t SendMessageToPort(port_id port);
bool operator<(const ScheduledMessage& other) const;
DelayedMessageData* fData;
};
/*! \class DelayedMessageSender DelayedMessageSender.h
\brief Responsible for scheduling and sending of delayed messages
*/
class DelayedMessageSender {
public:
explicit DelayedMessageSender();
~DelayedMessageSender();
status_t ScheduleMessage (DelayedMessage& message);
int32 CountDelayedMessages() const;
int64 CountSentMessages() const;
private:
void _MessageLoop();
int32 _SendDelayedMessages();
static int32 _thread_func(void* sender);
void _Wakeup(bigtime_t whatTime);
private:
typedef BObjectList<ScheduledMessage> ScheduledList;
mutable BLocker fLock;
ScheduledList fMessages;
bigtime_t fScheduledWakeup;
int32 fWakeupRetry;
thread_id fThread;
port_id fPort;
mutable int64 fSentCount;
};
DelayedMessageSender gDelayedMessageSender;
/*! \class DelayedMessageData DelayedMessageSender.h
\brief Owns DelayedMessage data, allocates memory and copies data only
when needed,
*/
class DelayedMessageData {
typedef BObjectList<port_id> PortList;
typedef void(*FailureCallback)(int32 code, port_id port, void* data);
public:
DelayedMessageData(int32 code, bigtime_t delay,
bool isSpecificTime);
~DelayedMessageData();
bool AddTarget(port_id port);
void RemoveTarget(port_id port);
int32 CountTargets() const;
void MergeTargets(DelayedMessageData* other);
bool CopyData();
bool MergeData(DelayedMessageData* other);
bool IsValid() const;
// Only valid after a successful CopyData().
status_t Attach(const void* data, size_t size);
bool Compare(Attachment* one, Attachment* two,
int32 index);
void SetMerge(DMMergeMode mode, uint32 mask);
void SendFailed(port_id port);
void SetFailureCallback(FailureCallback callback,
void* data);
// Accessors.
int32& Code() {return fCode;}
const int32& Code() const {return fCode;}
bigtime_t& ScheduledTime() {return fScheduledTime;}
const bigtime_t& ScheduledTime() const {return fScheduledTime;}
AttachmentList& Attachments() {return fAttachments;}
const AttachmentList& Attachments() const {return fAttachments;}
PortList& Targets() {return fTargets;}
const PortList& Targets() const {return fTargets;}
private:
// Data members.
int32 fCode;
bigtime_t fScheduledTime;
bool fValid;
AttachmentList fAttachments;
PortList fTargets;
DMMergeMode fMergeMode;
uint32 fMergeMask;
FailureCallback fFailureCallback;
void* fFailureData;
};
// #pragma mark -
DelayedMessage::DelayedMessage(int32 code, bigtime_t delay,
bool isSpecificTime)
:
fData(new(std::nothrow) DelayedMessageData(code, delay < DM_MINIMUM_DELAY
? DM_MINIMUM_DELAY : delay, isSpecificTime)),
fHandedOff(false)
{
}
DelayedMessage::~DelayedMessage()
{
// Message is canceled without a handoff.
if (!fHandedOff)
delete fData;
}
bool
DelayedMessage::AddTarget(port_id port)
{
if (fData == NULL || fHandedOff)
return false;
return fData->AddTarget(port);
}
void
DelayedMessage::SetMerge(DMMergeMode mode, uint32 match)
{
if (fData == NULL || fHandedOff)
return;
fData->SetMerge(mode, match);
}
void
DelayedMessage::SetFailureCallback(void (*callback)(int32, port_id, void*),
void* data)
{
if (fData == NULL || fHandedOff)
return;
fData->SetFailureCallback(callback, data);
}
//! Attach data to message. Memory is not allocated nor copied until handoff.
status_t
DelayedMessage::Attach(const void* data, size_t size)
{
if (fData == NULL)
return B_NO_MEMORY;
if (fHandedOff)
return B_ERROR;
if (data == NULL || size == 0)
return B_BAD_VALUE;
return fData->Attach(data, size);
}
status_t
DelayedMessage::Flush()
{
if (fData == NULL)
return B_NO_MEMORY;
if (fHandedOff)
return B_ERROR;
if (fData->CountTargets() == 0)
return B_BAD_VALUE;
return gDelayedMessageSender.ScheduleMessage(*this);
}
/*! The data handoff occurs upon scheduling and reduces copies to only
when a message is actually scheduled. Canceled messages have low cost.
*/
DelayedMessageData*
DelayedMessage::HandOff()
{
if (fData == NULL || fHandedOff)
return NULL;
if (fData->CopyData()) {
fHandedOff = true;
return fData;
}
return NULL;
}
// #pragma mark -
Attachment::Attachment(const void* _data, size_t _size)
:
constData(_data),
data(NULL),
size(_size)
{
}
Attachment::~Attachment()
{
free(data);
}
// #pragma mark -
DelayedMessageData::DelayedMessageData(int32 code, bigtime_t delay,
bool isSpecificTime)
:
fCode(code),
fScheduledTime(delay + (isSpecificTime ? 0 : system_time())),
fValid(false),
fAttachments(3, true),
fTargets(4, true),
fMergeMode(DM_NO_MERGE),
fMergeMask(DM_DATA_DEFAULT),
fFailureCallback(NULL),
fFailureData(NULL)
{
}
DelayedMessageData::~DelayedMessageData()
{
}
bool
DelayedMessageData::AddTarget(port_id port)
{
if (port <= 0)
return false;
// check for duplicates:
for (int32 index = 0; index < fTargets.CountItems(); ++index) {
if (port == *fTargets.ItemAt(index))
return false;
}
return fTargets.AddItem(new(std::nothrow) port_id(port));
}
void
DelayedMessageData::RemoveTarget(port_id port)
{
if (port == B_BAD_PORT_ID)
return;
// Search for a match by value.
for (int32 index = 0; index < fTargets.CountItems(); ++index) {
port_id* target = fTargets.ItemAt(index);
if (port == *target) {
fTargets.RemoveItem(target, true);
return;
}
}
}
int32
DelayedMessageData::CountTargets() const
{
return fTargets.CountItems();
}
void
DelayedMessageData::MergeTargets(DelayedMessageData* other)
{
// Failure to add one target does not abort the loop!
// It could just mean we already have the target.
for (int32 index = 0; index < other->fTargets.CountItems(); ++index)
AddTarget(*(other->fTargets.ItemAt(index)));
}
//! Copy data from original location - merging failed
bool
DelayedMessageData::CopyData()
{
Attachment* attached = NULL;
for (int32 index = 0; index < fAttachments.CountItems(); ++index) {
attached = fAttachments.ItemAt(index);
if (attached == NULL || attached->data != NULL)
return false;
attached->data = malloc(attached->size);
if (attached->data == NULL)
return false;
memcpy(attached->data, attached->constData, attached->size);
}
fValid = true;
return true;
}
bool
DelayedMessageData::MergeData(DelayedMessageData* other)
{
if (!fValid
|| other == NULL
|| other->fCode != fCode
|| fMergeMode == DM_NO_MERGE
|| other->fMergeMode == DM_NO_MERGE
|| other->fMergeMode != fMergeMode
|| other->fAttachments.CountItems() != fAttachments.CountItems())
return false;
if (other->fMergeMode == DM_MERGE_CANCEL) {
MergeTargets(other);
return true;
}
// Compare data
Attachment* attached = NULL;
Attachment* otherAttached = NULL;
for (int32 index = 0; index < fAttachments.CountItems(); ++index) {
attached = fAttachments.ItemAt(index);
otherAttached = other->fAttachments.ItemAt(index);
if (attached == NULL
|| otherAttached == NULL
|| attached->data == NULL
|| otherAttached->constData == NULL
|| attached->size != otherAttached->size)
return false;
// Compares depending upon mode & flags
if (!Compare(attached, otherAttached, index))
return false;
}
// add any targets not included in the existing message!
MergeTargets(other);
// since these are duplicates, we need not copy anything...
if (fMergeMode == DM_MERGE_DUPLICATES)
return true;
// DM_MERGE_REPLACE:
// Import the new data!
for (int32 index = 0; index < fAttachments.CountItems(); ++index) {
attached = fAttachments.ItemAt(index);
otherAttached = other->fAttachments.ItemAt(index);
// We already have allocated our memory, but the other data
// has not. So this reduces memory allocations.
memcpy(attached->data, otherAttached->constData, attached->size);
}
return true;
}
bool
DelayedMessageData::IsValid() const
{
return fValid;
}
status_t
DelayedMessageData::Attach(const void* data, size_t size)
{
// Sanity checking already performed
Attachment* attach = new(std::nothrow) Attachment(data, size);
if (attach == NULL)
return B_NO_MEMORY;
if (fAttachments.AddItem(attach) == false) {
delete attach;
return B_ERROR;
}
return B_OK;
}
bool
DelayedMessageData::Compare(Attachment* one, Attachment* two, int32 index)
{
if (fMergeMode == DM_MERGE_DUPLICATES) {
// Default-policy: all data must match
if (fMergeMask == DM_DATA_DEFAULT || (fMergeMask & 1 << index) != 0)
return memcmp(one->data, two->constData, one->size) == 0;
} else if (fMergeMode == DM_MERGE_REPLACE) {
// Default Policy: no data needs to match
if (fMergeMask != DM_DATA_DEFAULT && (fMergeMask & 1 << index) != 0)
return memcmp(one->data, two->constData, one->size) == 0;
}
return true;
}
void
DelayedMessageData::SetMerge(DMMergeMode mode, uint32 mask)
{
fMergeMode = mode;
fMergeMask = mask;
}
void
DelayedMessageData::SendFailed(port_id port)
{
if (fFailureCallback != NULL)
fFailureCallback(fCode, port, fFailureData);
}
void
DelayedMessageData::SetFailureCallback(FailureCallback callback, void* data)
{
fFailureCallback = callback;
fFailureData = data;
}
// #pragma mark -
ScheduledMessage::ScheduledMessage(DelayedMessage& message)
:
fData(message.HandOff())
{
}
ScheduledMessage::~ScheduledMessage()
{
delete fData;
}
int32
ScheduledMessage::CountTargets() const
{
if (fData == NULL)
return 0;
return fData->CountTargets();
}
bigtime_t
ScheduledMessage::ScheduledTime() const
{
if (fData == NULL)
return 0;
return fData->ScheduledTime();
}
//! Send our message and data to their intended target(s)
int32
ScheduledMessage::SendMessage()
{
if (fData == NULL || !fData->IsValid())
return 0;
int32 sent = 0;
for (int32 index = 0; index < fData->Targets().CountItems(); ++index) {
port_id port = *(fData->Targets().ItemAt(index));
status_t error = SendMessageToPort(port);
if (error == B_OK) {
++sent;
continue;
}
if (error != B_TIMED_OUT)
fData->SendFailed(port);
}
return sent;
}
status_t
ScheduledMessage::SendMessageToPort(port_id port)
{
if (fData == NULL || !fData->IsValid())
return B_BAD_DATA;
if (port == B_BAD_PORT_ID)
return B_BAD_VALUE;
BPrivate::LinkSender sender(port);
if (sender.StartMessage(fData->Code()) != B_OK)
return B_ERROR;
AttachmentList& list = fData->Attachments();
Attachment* attached = NULL;
status_t error = B_OK;
// The data has been checked already, so we assume it is all good
for (int32 index = 0; index < list.CountItems(); ++index) {
attached = list.ItemAt(index);
error = sender.Attach(attached->data, attached->size);
if (error != B_OK) {
sender.CancelMessage();
return error;
}
}
// We do not want to ever hold up the sender thread for too long, we
// set a 1 second sending delay, which should be more than enough for
// 99.992% of all cases. Approximately.
error = sender.Flush(1000000);
if (error == B_OK || error == B_BAD_PORT_ID)
fData->RemoveTarget(port);
return error;
}
bool
ScheduledMessage::IsValid() const
{
return fData != NULL && fData->IsValid();
}
bool
ScheduledMessage::Merge(DelayedMessage& other)
{
if (!IsValid())
return false;
return fData->MergeData(other.Data());
}
bool
ScheduledMessage::operator<(const ScheduledMessage& other) const
{
if (!IsValid() || !other.IsValid())
return false;
return fData->ScheduledTime() < other.fData->ScheduledTime();
}
int
CompareMessages(const ScheduledMessage* one, const ScheduledMessage* two)
{
return *one < *two;
}
// #pragma mark -
DelayedMessageSender::DelayedMessageSender()
:
fLock("DelayedMessageSender"),
fMessages(20, true),
fScheduledWakeup(B_INFINITE_TIMEOUT),
fWakeupRetry(0),
fThread(spawn_thread(&_thread_func, kName, kPriority, this)),
fPort(create_port(kPortCapacity, "DelayedMessageSender")),
fSentCount(0)
{
resume_thread(fThread);
}
DelayedMessageSender::~DelayedMessageSender()
{
// write the exit message to our port
write_port(fPort, kExitMessage, NULL, 0);
status_t status = B_OK;
while (wait_for_thread(fThread, &status) == B_OK);
// We now know the thread has exited, it is safe to cleanup
delete_port(fPort);
}
status_t
DelayedMessageSender::ScheduleMessage(DelayedMessage& message)
{
BAutolock _(fLock);
// Can we merge with a pending message?
ScheduledMessage* pending = NULL;
for (int32 index = 0; index < fMessages.CountItems(); ++index) {
pending = fMessages.ItemAt(index);
if (pending->Merge(message))
return B_OK;
}
// Guess not, add it to our list!
ScheduledMessage* scheduled = new(std::nothrow) ScheduledMessage(message);
if (scheduled == NULL)
return B_NO_MEMORY;
if (!scheduled->IsValid()) {
delete scheduled;
return B_BAD_DATA;
}
if (fMessages.AddItem(scheduled)) {
fMessages.SortItems(&CompareMessages);
_Wakeup(scheduled->ScheduledTime());
return B_OK;
}
return B_ERROR;
}
int32
DelayedMessageSender::CountDelayedMessages() const
{
BAutolock _(fLock);
return fMessages.CountItems();
}
int64
DelayedMessageSender::CountSentMessages() const
{
return atomic_get64(&fSentCount);
}
void
DelayedMessageSender::_MessageLoop()
{
int32 code = -1;
status_t status = B_TIMED_OUT;
bigtime_t timeout = B_INFINITE_TIMEOUT;
while (true) {
timeout = atomic_get64(&fScheduledWakeup) - (system_time()
+ (DM_MINIMUM_DELAY / 2));
if (timeout > DM_MINIMUM_DELAY / 4) {
status = read_port_etc(fPort, &code, NULL, 0, B_RELATIVE_TIMEOUT,
timeout);
} else
status = B_TIMED_OUT;
if (status == B_INTERRUPTED)
continue;
if (status == B_TIMED_OUT) {
_SendDelayedMessages();
continue;
}
if (status == B_OK) {
switch (code) {
case kWakeupMessage:
continue;
case kExitMessage:
return;
// TODO: trace unhandled messages
default:
continue;
}
}
// port deleted?
if (status < B_OK)
break;
}
}
int32
DelayedMessageSender::_thread_func(void* sender)
{
(static_cast<DelayedMessageSender*>(sender))->_MessageLoop();
return 0;
}
//! Sends pending messages, call ONLY from sender thread!
int32
DelayedMessageSender::_SendDelayedMessages()
{
// avoid sending messages during times of contention
if (fLock.LockWithTimeout(30000) != B_OK) {
atomic_add64(&fScheduledWakeup, DM_MINIMUM_DELAY);
return 0;
}
atomic_set64(&fScheduledWakeup, B_INFINITE_TIMEOUT);
if (fMessages.CountItems() == 0) {
fLock.Unlock();
return 0;
}
int32 sent = 0;
bigtime_t time = system_time() + DM_MINIMUM_DELAY / 2;
// capture any that may be on the verge of being sent.
BObjectList<ScheduledMessage> remove;
ScheduledMessage* message = NULL;
for (int32 index = 0; index < fMessages.CountItems(); ++index) {
message = fMessages.ItemAt(index);
if (message->ScheduledTime() > time) {
atomic_set64(&fScheduledWakeup, message->ScheduledTime());
break;
}
int32 sendCount = message->SendMessage();
if (sendCount > 0)
sent += sendCount;
if (message->CountTargets() == 0)
remove.AddItem(message);
}
// remove serviced messages
for (int32 index = 0; index < remove.CountItems(); ++index)
fMessages.RemoveItem(remove.ItemAt(index));
atomic_add64(&fSentCount, sent);
// catch any partly-failed messages (possibly late):
if (fMessages.CountItems() > 0
&& atomic_get64(&fScheduledWakeup) == B_INFINITE_TIMEOUT) {
fMessages.SortItems(&CompareMessages);
message = fMessages.ItemAt(0);
bigtime_t timeout = message->ScheduledTime() - time;
if (timeout < 0)
timeout = DM_MINIMUM_DELAY;
atomic_set64(&fScheduledWakeup, timeout);
}
fLock.Unlock();
return sent;
}
void
DelayedMessageSender::_Wakeup(bigtime_t when)
{
if (atomic_get64(&fScheduledWakeup) < when
&& atomic_get(&fWakeupRetry) == 0)
return;
atomic_set64(&fScheduledWakeup, when);
BPrivate::LinkSender sender(fPort);
sender.StartMessage(kWakeupMessage);
status_t error = sender.Flush(30000);
atomic_set(&fWakeupRetry, (int32)error == B_TIMED_OUT);
}