haiku/src/servers/registrar/MessagingService.cpp

465 lines
10 KiB
C++

/*
* Copyright 2005, Ingo Weinhold, bonefish@users.sf.net. All rights reserved.
* Distributed under the terms of the MIT License.
*/
#include <map>
#include <new>
#include <string.h>
#include <Autolock.h>
#include <syscalls.h>
#include "Debug.h"
#include "MessageDeliverer.h"
#include "MessagingService.h"
using std::map;
using std::nothrow;
// sService -- the singleton instance
MessagingService *MessagingService::sService = NULL;
/*! \class MessagingArea
\brief Represents an area of the messaging service shared between kernel
and registrar.
The main purpose of the class is to retrieve (and remove) commands from
the area.
*/
// constructor
MessagingArea::MessagingArea()
{
}
// destructor
MessagingArea::~MessagingArea()
{
if (fID >= 0)
delete_area(fID);
}
// Create
status_t
MessagingArea::Create(area_id kernelAreaID, sem_id lockSem, sem_id counterSem,
MessagingArea *&_area)
{
// allocate the object on the heap
MessagingArea *area = new(nothrow) MessagingArea;
if (!area)
return B_NO_MEMORY;
// clone the kernel area
area_id areaID = clone_area("messaging", (void**)&area->fHeader,
B_ANY_ADDRESS, B_READ_AREA | B_WRITE_AREA, kernelAreaID);
if (areaID < 0) {
delete area;
return areaID;
}
// finish the initialization of the object
area->fID = areaID;
area->fSize = area->fHeader->size;
area->fLockSem = lockSem;
area->fCounterSem = counterSem;
area->fNextArea = NULL;
_area = area;
return B_OK;
}
// Lock
bool
MessagingArea::Lock()
{
// benaphore-like locking
if (atomic_add(&fHeader->lock_counter, 1) == 0)
return true;
return (acquire_sem(fLockSem) == B_OK);
}
// Unlock
void
MessagingArea::Unlock()
{
if (atomic_add(&fHeader->lock_counter, -1) > 1)
release_sem(fLockSem);
}
// ID
area_id
MessagingArea::ID() const
{
return fID;
}
// Size
int32
MessagingArea::Size() const
{
return fSize;
}
// CountCommands
int32
MessagingArea::CountCommands() const
{
return fHeader->command_count;
}
// PopCommand
const messaging_command *
MessagingArea::PopCommand()
{
if (fHeader->command_count == 0)
return NULL;
// get the command
messaging_command *command
= (messaging_command*)((char*)fHeader + fHeader->first_command);
// remove it from the area
// (as long as the area is still locked, noone will overwrite the contents)
if (--fHeader->command_count == 0)
fHeader->first_command = fHeader->last_command = 0;
else
fHeader->first_command = command->next_command;
return command;
}
// Discard
void
MessagingArea::Discard()
{
fHeader->size = 0;
}
// NextKernelAreaID
area_id
MessagingArea::NextKernelAreaID() const
{
return fHeader->next_kernel_area;
}
// SetNextArea
void
MessagingArea::SetNextArea(MessagingArea *area)
{
fNextArea = area;
}
// NextArea
MessagingArea *
MessagingArea::NextArea() const
{
return fNextArea;
}
// #pragma mark -
// constructor
MessagingCommandHandler::MessagingCommandHandler()
{
}
// destructor
MessagingCommandHandler::~MessagingCommandHandler()
{
}
// #pragma mark -
// DefaultSendCommandHandler
class MessagingService::DefaultSendCommandHandler
: public MessagingCommandHandler {
virtual void HandleMessagingCommand(uint32 _command, const void *data,
int32 dataSize)
{
const messaging_command_send_message *sendData
= (const messaging_command_send_message*)data;
const void *messageData = (uint8*)data
+ sizeof(messaging_command_send_message)
+ sizeof(messaging_target) * sendData->target_count;
DefaultMessagingTargetSet set(sendData->targets,
sendData->target_count);
MessageDeliverer::Default()->DeliverMessage(messageData,
sendData->message_size, set);
}
};
// CommandHandlerMap
struct MessagingService::CommandHandlerMap
: map<uint32, MessagingCommandHandler*> {
};
/*! \class MessagingService
\brief Userland implementation of the kernel -> userland messaging service.
This service provides a way for the kernel to send BMessages (usually
notification (e.g. node monitoring) messages) to userland applications.
The kernel could write the messages directly to the respective target ports,
but this has the disadvantage, that a message needs to be dropped, if the
port is full at the moment of sending. By transferring the message to the
registrar, it is possible to use the MessageDeliverer which retries sending
messages on full ports.
The message transfer is implemented via areas shared between kernel
and registrar. By default one area is used as a ring buffer. The kernel
adds messages to it, the registrar removes them. If the area is full, the
kernel creates a new one and adds it to the area list.
While the service is called `messaging service' and we were speaking of
`messages' being passed through the areas, the service is actually more
general. In fact `commands' are passed through the areas. Currently the
only implemented command type is to send a message, but it is very easy
to add further command types (e.g. one for alerting the user in case of
errors).
The MessagingService maintains a mapping of command types to command
handlers (MessagingCommandHandler, which perform the actual processing
of the commands), that can be altered via
MessagingService::SetCommandHandler().
*/
// constructor
MessagingService::MessagingService()
: fLock("messaging service"),
fLockSem(-1),
fCounterSem(-1),
fFirstArea(NULL),
fCommandHandlers(NULL),
fCommandProcessor(-1),
fTerminating(false)
{
}
// destructor
MessagingService::~MessagingService()
{
fTerminating = true;
if (fLockSem >= 0)
delete_sem(fLockSem);
if (fCounterSem >= 0)
delete_sem(fCounterSem);
if (fCommandProcessor >= 0) {
int32 result;
wait_for_thread(fCommandProcessor, &result);
}
delete fCommandHandlers;
delete fFirstArea;
}
// Init
status_t
MessagingService::Init()
{
// create the semaphores
fLockSem = create_sem(0, "messaging lock");
if (fLockSem < 0)
return fLockSem;
fCounterSem = create_sem(0, "messaging counter");
if (fCounterSem < 0)
return fCounterSem;
// create the command handler map
fCommandHandlers = new(nothrow) CommandHandlerMap;
if (!fCommandHandlers)
return B_NO_MEMORY;
// spawn the command processor
fCommandProcessor = spawn_thread(MessagingService::_CommandProcessorEntry,
"messaging command processor", B_DISPLAY_PRIORITY, this);
if (fCommandProcessor < 0)
return fCommandProcessor;
// register with the kernel
area_id areaID = _kern_register_messaging_service(fLockSem, fCounterSem);
if (areaID < 0)
return areaID;
// create the area
status_t error = MessagingArea::Create(areaID, fLockSem, fCounterSem,
fFirstArea);
if (error != B_OK) {
_kern_unregister_messaging_service();
return error;
}
// resume the command processor
resume_thread(fCommandProcessor);
// install the default send message command handler
MessagingCommandHandler *handler = new(nothrow) DefaultSendCommandHandler;
if (!handler)
return B_NO_MEMORY;
SetCommandHandler(MESSAGING_COMMAND_SEND_MESSAGE, handler);
return B_OK;
}
// CreateDefault
status_t
MessagingService::CreateDefault()
{
if (sService)
return B_OK;
// create the service
MessagingService *service = new(nothrow) MessagingService;
if (!service)
return B_NO_MEMORY;
// init it
status_t error = service->Init();
if (error != B_OK) {
delete service;
return error;
}
sService = service;
return B_OK;
}
// DeleteDefault
void
MessagingService::DeleteDefault()
{
if (sService) {
delete sService;
sService = NULL;
}
}
// Default
MessagingService *
MessagingService::Default()
{
return sService;
}
// SetCommandHandler
void
MessagingService::SetCommandHandler(uint32 command,
MessagingCommandHandler *handler)
{
BAutolock _(fLock);
if (handler) {
(*fCommandHandlers)[command] = handler;
} else {
// no handler: remove and existing entry
CommandHandlerMap::iterator it = fCommandHandlers->find(command);
if (it != fCommandHandlers->end())
fCommandHandlers->erase(it);
}
}
// _GetCommandHandler
MessagingCommandHandler *
MessagingService::_GetCommandHandler(uint32 command) const
{
BAutolock _(fLock);
CommandHandlerMap::iterator it = fCommandHandlers->find(command);
return (it != fCommandHandlers->end() ? it->second : NULL);
}
// _CommandProcessorEntry
int32
MessagingService::_CommandProcessorEntry(void *data)
{
return ((MessagingService*)data)->_CommandProcessor();
}
// _CommandProcessor
int32
MessagingService::_CommandProcessor()
{
bool commandWaiting = false;
while (!fTerminating) {
// wait for the next command
if (!commandWaiting) {
status_t error = acquire_sem(fCounterSem);
if (error != B_OK)
continue;
} else
commandWaiting = false;
// get it from the first area
MessagingArea *area = fFirstArea;
area->Lock();
while (area->CountCommands() > 0) {
const messaging_command *command = area->PopCommand();
if (!command) {
// something's seriously wrong
ERROR("MessagingService::_CommandProcessor(): area %p (%"
B_PRId32 ") has command count %" B_PRId32 ", but doesn't "
"return any more commands.", area, area->ID(),
area->CountCommands());
break;
}
PRINT("MessagingService::_CommandProcessor(): got command %" B_PRIu32 "\n",
command->command);
// dispatch the command
MessagingCommandHandler *handler
= _GetCommandHandler(command->command);
if (handler) {
handler->HandleMessagingCommand(command->command, command->data,
command->size - sizeof(messaging_command));
} else {
WARNING("MessagingService::_CommandProcessor(): No handler "
"found for command %" B_PRIu32 "\n", command->command);
}
}
// there is a new area we don't know yet
if (!area->NextArea() && area->NextKernelAreaID() >= 0) {
// create it
MessagingArea *nextArea;
status_t error = MessagingArea::Create(area->NextKernelAreaID(),
fLockSem, fCounterSem, nextArea);
if (error == B_OK) {
area->SetNextArea(nextArea);
commandWaiting = true;
} else {
// Bad, but what can we do?
ERROR("MessagingService::_CommandProcessor(): Failed to clone "
"kernel area %" B_PRId32 ": %s\n", area->NextKernelAreaID(),
strerror(error));
}
}
// if the current area is empty and there is a next one, we discard the
// current one
if (area->NextArea() && area->CountCommands() == 0) {
fFirstArea = area->NextArea();
area->Discard();
area->Unlock();
delete area;
} else {
area->Unlock();
}
}
return 0;
}