haiku/src/kits/app/LinkSender.cpp

336 lines
6.9 KiB
C++

/*
* Copyright 2001-2005, Haiku.
* Distributed under the terms of the MIT License.
*
* Authors:
* Pahtz <pahtz@yahoo.com.au>
* Axel Dörfler, axeld@pinc-software.de
*/
/** Class for low-overhead port-based messaging */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <new>
#include <ServerProtocol.h>
#include <LinkSender.h>
#include "link_message.h"
#include "syscalls.h"
//#define DEBUG_BPORTLINK
#ifdef DEBUG_BPORTLINK
# include <stdio.h>
# define STRACE(x) printf x
#else
# define STRACE(x) ;
#endif
static const size_t kMaxStringSize = 4096;
static const size_t kWatermark = kInitialBufferSize - 24;
// if a message is started after this mark, the buffer is flushed automatically
namespace BPrivate {
LinkSender::LinkSender(port_id port)
:
fPort(port),
fTargetTeam(-1),
fBuffer(NULL),
fBufferSize(0),
fCurrentEnd(0),
fCurrentStart(0),
fCurrentStatus(B_OK)
{
}
LinkSender::~LinkSender()
{
free(fBuffer);
}
void
LinkSender::SetPort(port_id port)
{
fPort = port;
}
status_t
LinkSender::StartMessage(int32 code, size_t minSize)
{
// end previous message
if (EndMessage() < B_OK)
CancelMessage();
if (minSize > kMaxBufferSize - sizeof(message_header)) {
// we will handle this case in Attach, using an area
minSize = sizeof(area_id);
}
minSize += sizeof(message_header);
// Eventually flush buffer to make space for the new message.
// Note, we do not take the actual buffer size into account to not
// delay the time between buffer flushes too much.
if (fBufferSize > 0 && (minSize > SpaceLeft() || fCurrentStart >= kWatermark)) {
status_t status = Flush();
if (status < B_OK)
return status;
}
if (minSize > fBufferSize) {
if (AdjustBuffer(minSize) != B_OK)
return fCurrentStatus = B_NO_MEMORY;
}
message_header *header = (message_header *)(fBuffer + fCurrentStart);
header->size = 0;
// will be set later
header->code = code;
header->flags = 0;
STRACE(("info: LinkSender buffered header %ld (%lx) [%lu %lu %lu].\n",
code, code, header->size, header->code, header->flags));
fCurrentEnd += sizeof(message_header);
return B_OK;
}
status_t
LinkSender::EndMessage(bool needsReply)
{
if (fCurrentEnd == fCurrentStart || fCurrentStatus < B_OK)
return fCurrentStatus;
// record the size of the message
message_header *header = (message_header *)(fBuffer + fCurrentStart);
header->size = CurrentMessageSize();
if (needsReply)
header->flags |= needsReply;
STRACE(("info: LinkSender EndMessage() of size %ld.\n", header->size));
// bump to start of next message
fCurrentStart = fCurrentEnd;
return B_OK;
}
void
LinkSender::CancelMessage()
{
fCurrentEnd = fCurrentStart;
fCurrentStatus = B_OK;
}
status_t
LinkSender::Attach(const void *passedData, size_t passedSize)
{
size_t size = passedSize;
const void* data = passedData;
if (fCurrentStatus < B_OK)
return fCurrentStatus;
if (size == 0)
return fCurrentStatus = B_BAD_VALUE;
if (fCurrentEnd == fCurrentStart)
return B_NO_INIT; // need to call StartMessage() first
bool useArea = false;
if (size >= kMaxBufferSize) {
useArea = true;
size = sizeof(area_id);
}
if (SpaceLeft() < size) {
// we have to make space for the data
status_t status = FlushCompleted(size + CurrentMessageSize());
if (status < B_OK)
return fCurrentStatus = status;
}
area_id senderArea = -1;
if (useArea) {
if (fTargetTeam < 0) {
port_info info;
status_t result = get_port_info(fPort, &info);
if (result != B_OK)
return result;
fTargetTeam = info.team;
}
void* address = NULL;
off_t alignedSize = (passedSize + B_PAGE_SIZE) & ~(B_PAGE_SIZE - 1);
senderArea = create_area("LinkSenderArea", &address, B_ANY_ADDRESS,
alignedSize, B_NO_LOCK, B_READ_AREA | B_WRITE_AREA);
if (senderArea < B_OK)
return senderArea;
data = &senderArea;
memcpy(address, passedData, passedSize);
area_id areaID = senderArea;
senderArea = _kern_transfer_area(senderArea, &address,
B_ANY_ADDRESS, fTargetTeam);
if (senderArea < B_OK) {
delete_area(areaID);
return senderArea;
}
}
memcpy(fBuffer + fCurrentEnd, data, size);
fCurrentEnd += size;
return B_OK;
}
status_t
LinkSender::AttachString(const char *string, int32 length)
{
if (string == NULL)
string = "";
size_t maxLength = strlen(string);
if (length == -1) {
length = (int32)maxLength;
// we should report an error here
if (maxLength > kMaxStringSize)
length = 0;
} else if (length > (int32)maxLength)
length = maxLength;
status_t status = Attach<int32>(length);
if (status < B_OK)
return status;
if (length > 0) {
status = Attach(string, length);
if (status < B_OK)
fCurrentEnd -= sizeof(int32); // rewind the transaction
}
return status;
}
status_t
LinkSender::AdjustBuffer(size_t newSize, char **_oldBuffer)
{
// make sure the new size is within bounds
if (newSize <= kInitialBufferSize)
newSize = kInitialBufferSize;
else if (newSize > kMaxBufferSize)
return B_BUFFER_OVERFLOW;
else if (newSize > kInitialBufferSize)
newSize = (newSize + B_PAGE_SIZE - 1) & ~(B_PAGE_SIZE - 1);
if (newSize == fBufferSize) {
// keep existing buffer
if (_oldBuffer)
*_oldBuffer = fBuffer;
return B_OK;
}
// create new larger buffer
char *buffer = (char *)malloc(newSize);
if (buffer == NULL)
return B_NO_MEMORY;
if (_oldBuffer)
*_oldBuffer = fBuffer;
else
free(fBuffer);
fBuffer = buffer;
fBufferSize = newSize;
return B_OK;
}
status_t
LinkSender::FlushCompleted(size_t newBufferSize)
{
// we need to hide the incomplete message so that it's not flushed
int32 end = fCurrentEnd;
int32 start = fCurrentStart;
fCurrentEnd = fCurrentStart;
status_t status = Flush();
if (status < B_OK) {
fCurrentEnd = end;
return status;
}
char *oldBuffer = NULL;
status = AdjustBuffer(newBufferSize, &oldBuffer);
if (status != B_OK)
return status;
// move the incomplete message to the start of the buffer
fCurrentEnd = end - start;
if (oldBuffer != fBuffer) {
memcpy(fBuffer, oldBuffer + start, fCurrentEnd);
free(oldBuffer);
} else
memmove(fBuffer, fBuffer + start, fCurrentEnd);
return B_OK;
}
status_t
LinkSender::Flush(bigtime_t timeout, bool needsReply)
{
if (fCurrentStatus < B_OK)
return fCurrentStatus;
EndMessage(needsReply);
if (fCurrentStart == 0)
return B_OK;
STRACE(("info: LinkSender Flush() waiting to send messages of %ld bytes on port %ld.\n",
fCurrentEnd, fPort));
status_t err;
if (timeout != B_INFINITE_TIMEOUT) {
do {
err = write_port_etc(fPort, kLinkCode, fBuffer,
fCurrentEnd, B_RELATIVE_TIMEOUT, timeout);
} while (err == B_INTERRUPTED);
} else {
do {
err = write_port(fPort, kLinkCode, fBuffer, fCurrentEnd);
} while (err == B_INTERRUPTED);
}
if (err < B_OK) {
STRACE(("error info: LinkSender Flush() failed for %ld bytes (%s) on port %ld.\n",
fCurrentEnd, strerror(err), fPort));
return err;
}
STRACE(("info: LinkSender Flush() messages total of %ld bytes on port %ld.\n",
fCurrentEnd, fPort));
fCurrentEnd = 0;
fCurrentStart = 0;
return B_OK;
}
} // namespace BPrivate