MTask
Preemptive multitasking
executive for protected mode DOS
MTask is
a fully preemptive multitasking executive for PCs with a single 80386 or higher
CPU. It supports fixed priorities and time slicing. This version is written for
the Digital Mars C compiler with the X32 DOS extender, and generates protected
mode executables that run on plain DOS. Programs using MTask should be compiled
with the –mx memory model flag and linked with mtask.lib and x32.lib (the
version of X32 that does not use virtual memory – see the limitations below).
Preemptions
are not enabled for interrupts originating in real mode or X32 code. This
limitation is imposed by X32. DOS blocking calls, for example, should be
avoided since the task cannot be preempted in real mode (they should be avoided
anyway since they do busy waiting and consume CPU unnecessarily).
This
software is public domain. You may use it for whatever you like.
Contacting
the author
Please send
any comments, criticism, improvements or bug reports to:
Hugo
Etchegoyen
Buenos
Aires, Argentina
hee@fibertel.com.ar
hetchegoyen@hasar.com
The MTask library
The MTask library contains:
1.
The multitasking kernel
proper, whose API is defined in mtask.h:
·
Kernel.obj – Task
management, scheduling, memory management, task queue primitives, inter-task
message passing, context switching, timing.
·
Queue.obj – Task
queues.
·
Irq.obj – Hardware
interrupts.
·
Math.obj – Saving and restoring
coprocessor status.
2.
Additional modules.
Each one has a header file defining the respective API.
·
Sem.obj (sem.h) –
Semaphores.
·
Mutex.obj (mutex.h) –
Mutexes (mutual exclusion objects).
·
Monitor.obj (monitor.h)
– Monitors and condition variables.
·
Pipe.obj (pipe.h) –
Pipes.
·
Msgqueue.obj
(msgqueue.h) – Fixed size message queues.
A task is an execution thread. Each task has:
1.
A function body.
2.
A stack which is
dynamically allocated when the task is created.
3.
A context,
basically the register set and the math coprocessor status, if available.
4.
A control block,
which holds the task’s stack pointer, some context variables and some other
information used for message passing and synchronization.
A task’s body or function is of the type:
void task(void *arg)
When created, each task is allocated a stack
and passed a single arg argument. There is no limitation to the number
of tasks that can be created (except available memory). Several tasks can share
the same function body. A task ends by returning from its function body or by
executing the Exit() primitive (note the capital E). Executing the
standard C library exit() function, or returning from main(), will end
the whole program with all its tasks.
As with any other multithreading system, global
data are shared by all tasks. It is the programmer’s responsibility to
serialize the use of shared resources. These include the operating system
(DOS), static and global variables and non-reentrant functions, either in the
application or in libraries, including the standard C library.
Task states
Tasks can be in any of the following states:
·
Suspended
·
Ready
·
Running
·
Delaying
·
Waiting (in a task
queue)
·
Sending (a message)
·
Receiving (a message)
·
Terminated
Tasks are created in the suspended state.
In order to start the task running, it must be put in the ready state
by calling Ready().
This being a single-processor system, only one
task may be assigned the CPU at a time. This task is called the current
task, and it is in the running state. If all the user tasks are blocked
(none of them can run), MTask runs an internal task that does nothing and never
blocks, the null task. The null task has minimum priority and will
relinquish the CPU as soon as some other task is ready.
Within MTask there is a function (the
scheduler) that chooses which of the currently ready to run tasks should be
assigned the CPU. If the scheduler finds that the current task must be changed,
it performs a context switch operation. Context switching is performed
by saving the register context in the current task’s stack, saving the current
stack pointer and additional context information in the current task’s control
block, and restoring the stack and context from the new current task.
Tasks in the delaying state are waiting for
some period of time to elapse. Tasks waiting in a task queue are blocked until
some other task (or an interrupt) unblocks them and puts them again in the
ready state. Tasks in the sending or receiving states are blocked until some
other task performs the complementary operation (receiving or sending,
respectively). A message is then transferred and the previously blocked task is
made ready again.
When a task is terminated its resources are
freed. However, this cannot be done immediately when a task terminates itself,
either by executing Exit() or by returning from its function body. In
this case the task enters the terminated state, and its resources will
be freed later.
MTask is preemptive, with fixed
priorities and time slicing.
Preemptive scheduling means any task may loose the CPU as
a consequence of a hardware interrupt. Preemptions may be disabled on a task by
task basis, by default all tasks start running in preemptive mode. A task may
put itself in non-preemptive (or cooperative) mode. This may be useful
for executing some code sections atomically. When a task is running in
cooperative mode, it will only relinquish the CPU by executing a blocking
primitive. Note that this mechanism protects the task from losing the CPU as a
result of an interrupt, but does not protect it against the execution of
interrupt code. If a task needs to protect itself from the execution of all
interrupts or some specific ones, it must disable them by using the available
primitives. Disabling interrupts is a mechanism frequently used within the
MTask kernel ir order to serialize some critical sections. It is recommended,
however, that applications refrain from using it unless it is absolutely
necessary.
Tasks run with static or fixed priorities,
which may be changed any time. Priorities will not be dynamically adjusted by
the kernel, as is usually done in general-purpose operating systems like Unix.
Fixed priorities are more adequate for dedicated systems, since they guarantee
predictable behaviour.
The scheduler in MTask uses a very simple
algorithm: it just assigns the CPU to the highest priority task among those
ready to run. If two or more ready tasks share the highest priority, the CPU is
assigned to them in round-robin way according to the time slice.
Task queues are used by MTask to manage waiting, delaying
and ready tasks. They are implemented as doubly-linked, sorted lists. For all
queues, except the time queue, the sorting criterium is task priority. When a
task is inserted in a priority-sorted queue, it will be placed immediately
before any other task in the queue with the same or higher priority. This means
that the last task in the queue will always be the one with the highest
priority, or, among two or more with the same (highest) priority, the one that
has been waiting for the longest.
All ready tasks wait for the CPU in one such
queue, the ready queue. The scheduler assigns the CPU to the last task
in this queue.
The terminated queue is used for holding
tasks that have terminated themselves, whose resources have not yet been freed.
Memory allocation functions flush this queue and free the associated memory.
The time queue is sorted by is
increasing wakeup time. There is a variable called msecs in each task’s
control block, which is used to count the milliseconds remaining until wakeup.
This is the absolute remaining time for the first task in the time queue, for
the rest, the msecs variable holds the difference in wakeup time from
the previous task. The timer interrupt decrements the msecs variable of
the first task in the time queue, when it reaches zero, the task is removed
from the time queue and made ready. The second task comes now to the front of
the queue; its msecs variable is now the absolute remaining time for
wakeup, and so on.
Tasks are in the time queue when they are in
the delaying state, or in the waiting, sending or receiving
states in case the blocking primitive was called with a finite timeout.
At any given time a task may be found in no
queue at all, in a task queue, in the time queue, or in both. These are the
possible combinations, according to the task’s state:
·
No queue: states suspended,
running, receiving (without timeout).
·
Ready queue: state ready
to run.
·
Time queue: states delaying,
receiving (with timeout)
·
Task queue: states waiting
in task queue (without timeout), transmitting (without timeout).
·
Task queue and time
queue: states waiting in task queue (with timeout), transmitting (with
timeout).
·
Terminated queue: state
terminated.
The main task and the null task are created
when MTask is initialized. The main task runs the program’s main code with
default priority. It runs like any other task, but it has some special
features. Since it has not been created with CreateTask(), it cannot be
deleted with DeleteTask() and it cannot call Exit(), which is
equivalent to DeleteTask(CurrentTask()). The main task can only
terminate by calling the C library function exit() or returning from main(),
and then the whole program terminates.
Some
constants defined in mtask.h:
MIN_PRIO =
0 Priority of
the null task
DEFAULT_PRIO =
50 Priority of the main task
MAX_PRIO = -1
Maximum priority
FOREVER =
-1 Infinite
timeout
MTask API
The API is defined in the header
file mtask.h.
Object constructors (functions
starting with Create...) accept a name string for the object. This name
is not used by the kernel, but it might be useful for debugging. A dynamic copy
of the string is stored in the object. Pass NULL if you are not interested in
giving the object a name.
MTask initialization
void InitMTask(Time_t mspertick, Time_t timeslice)
Initializes MTask’s kernel and
creates the main and null tasks. It must be called before any other MTask
function.
MTask uses the real time interrupt
(IRQ 0) as a base for time measurements. In a standard PC running DOS, this
interrupt is triggered once every 55 milliseconds. The mspertick
argument, if different from zero, reprograms the real-time counter and sets
this interval to any number of milliseconds from 1 to 55. A high timer
frequency (maximum 1 kHz when mspertick == 1) provides a finer resolution
in the measurement of delay and timeout intervals. The default timer interval
(55 milliseconds) is restored upon program termination.
The timeslice argument, if
different from zero, establishes the number of milliseconds that a task may run
before being preempted when another task of the same priority is ready. The
default value is two default timer ticks, or 110 milliseconds.
Task creation and destruction
Task_t *CreateTask(TaskFunc_t func, unsigned stacksize, void *arg, char
*name, Prio_t priority)
Task_t *CurrentTask(void)
void DeleteTask(Task_t *task)
CreateTask() creates a new task using the func
body, which receives arg as its argument. The initial state is TaskSuspended.
It allocates a stack of stacksize bytes. To run the task, call Ready().
CurrentTask() returns a pointer to the calling task.
DeleteTask() destroys a task. This function may not be
applied to the main task.
void ProtectMath(Task_t *task)
Tasks
using the 80387 arithmetic coprocessor (if more than one) should call this
function to enable saving and restoring the coprocessor status during context
switch. Digital Mars math emulation is reentrant, so this protection is only
necessary if a coprocessor is used.
Prio_t GetPriority(Task_t *task)
void SetPriority(Task_t *task, Prio_t priority)
If you change the priority of a task
that is currently in a priority-sorted queue (i.e. any queue except the time
queue), the task is removed from the queue and then re-inserted in it in order
to reflect its new priority. If you change the priority of either the current
task or one of the ready tasks the scheduler is called, since the new priority
setting may dictate the need to reassign the CPU.
State changes
void Suspend(Task_t *task)
void Ready(Task_t *task)
Suspend() blocks a task without placing it in
any queue. The only way to make a suspended task ready again is calling Ready().
Ready() makes a task ready to run, no matter its
previous state. If the task was blocked as a result of calling WaitQueue(),
Send() or Receive(), these functions will return false
indicating failure.
Task queues
TaskQueue_t *CreateQueue(char *name)
void DeleteQueue(TaskQueue_t *queue)
bool WaitQueue(TaskQueue_t
*queue)
bool WaitQueueTimed(TaskQueue_t *queue, Time_t msecs)
bool
SignalQueue(TaskQueue_t *queue)
void FlushQueue(TaskQueue_t *queue, bool success)
Task queues offer an IPC mechanism
similar to the sleep() and wakeup() primitives of the Unix
kernel. Tasks can use the queues to wait for events; signalling the queue
generates the event and wakes up the waiting task. A basic limitation of this
mechanism is that task queues have no way of remembering events that are
signalled before some task waits for them, these events are lost. They are very
useful, however, as a basic building block to construct higher-level IPC
primitives. For example, a semaphore can be easily built using a task queue and
a counter to remember signalled, but not yet consumed events.
CreateQueue() creates a task queue; DeleteQueue()
destroys it. If there are tasks waiting in the queue when it is destroyed,
their waiting operations fail.
The WaitQueue...() functions
are for waiting in a queue; the task enters the TaskWaiting state. True
as return status indicates success, false means failure. Failure may
be due to timeout, Ready() or queue flushing.
SignalQueue() wakes up the highest priority
(last) task in the queue, whose WaitQueue...() operation completes successfully.
This function returns true if a task was awakened, and false if
the queue was empty (i.e. the signalled event was lost).
FlushQueue() wakes up all tasks in the queue,
which complete their respective WaitQueue...() operations with the
given status.
bool Send(Task_t *to, void *msg, unsigned size)
bool SendCond(Task_t *to, void *msg, unsigned size)
bool SendTimed(Task_t *to, void *msg, unsigned size, Time_t msecs)
bool Receive(Task_t **from, void *msg, unsigned *size)
bool ReceiveCond(Task_t **from, void *msg, unsigned *size)
bool ReceiveTimed(Task_t **from, void *msg, unsigned *size, Time_t
msecs)
MTask’s message passing uses the so-called rendezvous
system, which gives the highest possible level of coupling between sender
and receiver. The message is transferred directly between the participating
tasks without intermediate buffering. The first task that calls a messaging
primitive (send or receive) blocks until its peer performs the matching
operation.
Sending or receiving a message bigger than the
receiver’s buffer will produce a fatal error.
Receive() gets as its first argument (from) a
pointer to a pointer to a task. This may be used to select a sender to receive
messages from, or to get a pointer to the sender once the message is received.
The possible scenarios are as follows:
From == NULL: receive a message from any task
From != NULL, *from == NULL: receive a
message from any task, once received, *from = pointer to sender.
From != NULL, *from == pointer to sender:
receive a message from given sender only.
In order to receive the message contents, a
pointer to a suitable buffer (msg) is passed. If you are not interested
in the message itself, msg can be NULL. In this case the messaging
system may be used just for task synchronization. If msg is not NULL,
the size argument points to an integer variable where the size of the
buffer is initially stored. Upon reception of the message, this variable will
hold the real message size. Messages bigger than the buffer generate a fatal
error.
ReceiveCond() and ReceiveTimed() are the conditional
(non-blocking) and timeout versions of Receive().
Messages are transmitted using Send() or
its conditional or timeout versions, SendCond() and SendTimed().
This functions may be used just for synchronization purposes by passing NULL
for the msg pointer. In the blocking calls, if the receiver is not
waiting for a message from the sending task or from any, the sender is blocked
in state TaskSending and waits in a task queue associated to the
receiver.
These functions control the state of the
calling task:
void
Pause(void)
void Scheduler(void)
void Yield(void)
void Delay(Time_t msecs)
void DelayUntil(Time_t deadline)
void
Exit(void)
Pause() suspends the current task – it is equivalent
to Suspend(CurrentTask()).
Scheduler() calls MTask’s scheduler, giving it an
opportunity to reassign the CPU. It is used mostly by tasks running in cooperative
mode, and it is placed in the stack when returning from a first-level interrupt
to take care of preemptions. This function will relinquish the CPU if some
other task of higher priority is ready, or if another task of the same priority
is ready and the current task’s time slice is exhausted.
Yield() is like Scheduler(), but will reassign
the CPU to any ready task of the same or higher priority, even if the time
slice is not yet exhausted.
Delay() waits for a given number of milliseconds.
DelayUntil() waits until some time in the future. If the
deadline has already expired, it does not block. This function gives better
accuracy than Delay() for periodic tasks.
Exit() terminates the calling task, and is equivalent
to DeleteTask(CurrentTask()). This function is automatically called when
a task returns from its function body. It cannot be called from the main task.
void Atomic(void)
void Unatomic(void)
Atomic() increments the atomic_level field of
the current task’s control block. When this field has a value greater than
zero, the task runs in cooperative mode. Unatomic() decrements the atomic_level
field; when it reaches zero, the scheduler is called to make up for possibly
delayed context switches, and the task runs in preemptive mode thereafter.
These functions may be nested, which
facilitates their use in libraries.
typedef bool (*Handler_t)(unsigned irq)
void SetHandler(unsigned irq, Handler_t handler)
void EndInterrupt()
void DisableIRQ(unsigned irq)
void EnableIRQ(unsigned irq)
MTask traps hardware interrupts on demand. The
MTask kernel traps the real time interrupt (IRQ 0) internally, all other IRQs
may be trapped by the application. The interrupt module handles switching to
internal stacks, supports nested interrupts and tracks the interrupt level in
order to allow for preemption when returning from first-level interrupts
occurring on the application code.
User interrupt handlers must be installed using
SetHandler(). The handler is a standard C function which receives the
IRQ number as an argument and must return true if the interrupt is
handled completely, or false to indicate that control should be chained
to the previous handler. If the interrupt is handled completely, the handler
should call EndInterrupt() before returning; this sends the EOI command
to the PICs. If you want to un-trap an IRQ and restore the original vector,
call SetHandler(irq, NULL). Trying to trap IRQ 0 causes Panic().
If you need to execute some application code on each timer interrupt, use SetTimerCallback()
instead, see below.
Individual hardware interrupts may be enabled
or disabled by calling EnableIRQ() and DisableIRQ().
Upon program termination, the original vectors
and PIC masks are restored.
void *Malloc(unsigned size)
void *Realloc(void *mem, unsigned size)
char *StrDup(char *str)
void Free(void *mem)
These functions are thread-safe
versions of their lowercase counterparts in the C library. They also have some
additional features: memory allocated by Malloc() is filled with
zeroes, Realloc(), StrDup() and Free() tolerate null
pointers. Before allocating memory, the terminated tasks queue is flushed and
the associated memory returned to the heap. Malloc() will never return
NULL; memory exhaustion will produce a fatal error. Realloc() and StrDup()
may only return NULL if passed a NULL pointer argument.
Time_t GetTime(void)
This function returns the number of
milliseconds elapsed since MTask was initialized. Its absolute value may not be
very accurate, depending on the duration of the timer tick, because of rounding
errors when the timer count value is calculated. It is not intended to be used
for absolute time measurements, but rather differentially to measure time
intervals and to calculate deadlines for DelayUntil() relative to the
current time.
void SetData(Task_t *task, void *data)
A convenient way of assigning a task a pointer
to private data. Not used by the kernel.
typedef void (*Switcher_t)(Task_t *save, Task_t *restore)
void SetSwitcher(Switcher_t switcher)
MTask takes care of saving and restoring the register
and coprocessor context of tasks. Users may want to save and restore other
contexts; this is accomplished by installing a suitable switcher function. This
switcher is called immediately before MTask’s own context switch; it receives
pointers to the task which is about to relinquish the CPU, whose context must
be saved, and to the next task to run, whose context must be restored. This
function is called with interrupts disabled, and must not enable them.
typedef void (*Callback_t)(void)
void SetTimerCallback(Callback_t callback)
Sets a pointer to a function to be called on
each timer interrupt.
void Panic(char *msg)
Called in case of fatal errors. It prints an
error message in standard error output and exits the program.
void DisableInts()
void RestoreInts()
MTask keeps a global interrupt disabling
counter; interrupts are disabled when the count is greater than zero. As this
counter is saved and restored with each context switch, the status of
interrupts is private for each task, i.e., some tasks may run with interrupts
disabled and some others with interrupts enabled. DisableInts() disables
interrupts and increments the counter; RestoreInts() decrements the
counter and enables interrupts when it reaches zero. These functions may be
nested, which facilitates their use in libraries.
Interrupt
handlers may only call a small subset of MTask’s API, namely,
non-blocking functions that may wake up sleeping tasks:
·
Ready()
·
SignalQueue()
·
FlushQueue()
·
SendCond()
Note that only the conditional form of the
message sending primitives may be used, since it is the only one guaranteed not
to block. Interrupts can send messages only to tasks waiting for them; this
situation is typical in all message-driven systems (see the case of Minix in Operating
Systems – Design and Implementation by A. Tanenbaum).
Semaphores
The semaphore API is defined in sem.h.
typedef struct Semaphore_t
{
unsigned value;
TaskQueue_t
* queue;
}
Semaphore_t
A
semaphore is implemented by a task queue and a counter (the semaphore value)
that takes care of previously signalled but not yet consumed events.
Semaphore_t *CreateSem(char *name,
unsigned value)
void DeleteSem(Semaphore_t *sem)
The
semaphore is created with an initial value.
bool WaitSem(Semaphore_t *sem)
bool WaitSemCond(Semaphore_t *sem)
bool WaitSemTimed(Semaphore_t *sem,
Time_t msecs)
These
functions implement the basic DOWN semaphore primitive in its three flavours:
without timeout, with timeout and conditional. If the semaphore value is
greater than zero, DOWN decrements the value and returns success. This
operation consumes a previously signalled event. In the semaphore value is
zero, DOWN either fails (in the conditional version) or blocks the calling task
in the semaphore’s task queue.
void SignalSem(Semaphore_t *sem)
This
function implements the basic UP semaphore primitive. It signals an event; if
processes are waiting in the semaphore’s queue, it wakes up the one with the
highest priority and makes it ready; this process will complete its DOWN
operation successfully. If no process is awakened, the semaphore value is
incremented to account for a not-yet-consumed event.
unsigned ValueSem(Semaphore_t *sem)
Tells
the semaphore value, i.e., the number of previously signalled but not yet
consumed events.
void FlushSem(Semaphore_t *sem, bool
wait_ok)
This is like SignalSem(), but it wakes
up all tasks waiting in the semaphore. The completion status of their DOWN operations
is given as the second argument.
The following semaphore primitives may be
safely called from interrupt handlers:
·
ValueSem()
·
SignalSem()
·
FlushSem()
Mutexes
The mutex API is defined in mutex.h
typedef struct Mutex_t
{
unsigned use_count;
Task_t * owner;
Semaphore_t
* sem;
}
Mutex_t
Mutexes
are recursive mutual exclusion objects. A semaphore with an initial value of 1 might
be used to the same effect, but it would not allow recursive occupation of the
critical zone, which is a useful feature in libraries. The mutex uses a mutual
exclusion semaphore, buts adds some administration to allow recursive
occupation. The use_count field tracks the nesting level of recursive
occupations; the owner field indicates which task is currently in the
critical zone. Initial values for an unoccupied mutex are 1 for the semaphore
count, 0 for use_count and NULL for owner.
When
a mutex is first occupied by a task, owner points to the occupying task
and use_count is 1. Nested occupations increment the value of use_count.
The semaphore value of an occupied mutex is always 0. Tasks waiting to enter
the mutex sleep in the semaphore´s queue.
Mutex_t *CreateMutex(char *name)
void DeleteMutex(Mutex_t *mut)
Create
an initially free mutex and destroy it.
bool EnterMutex(Mutex_t *mut)
bool EnterMutexCond(Mutex_t *mut)
bool EnterMutexTimed(Mutex_t *mut,
Time_t msecs)
Functions
to occupy a mutex in three flavours: with and without timeout, and conditional.
The first two may block if the mutex is owned by another task. Once occupied
for the first time, each nested occupation by the same task merely increments
the use_count.
void LeaveMutex(Mutex_t *mut)
Decrements the use_count and frees the
mutex when this count reaches zero. If tasks are waiting to enter the mutex,
the highest-priority one is then awakened. In order to free a mutex, this
function must be called once for every successful occupation.
Mutex primitives may not be called from
interrupt handlers.
Monitors
Monitors and condition variables are
implemented in this module. The API is in monitor.h
typedef struct
{
Task_t * owner;
Semaphore_t *
sem;
}
Monitor_t
Monitors
are basically non-recursive mutexes. Compare a monitor´s structure with that of
a mutex: the use_count field is missing.
Monitor_t *CreateMonitor(char *name)
void DeleteMonitor(Monitor_t *mon)
bool EnterMonitor(Monitor_t *mon)
bool EnterMonitorCond(Monitor_t
*mon)
bool EnterMonitorTimed(Monitor_t
*mon, Time_t msecs)
void LeaveMonitor(Monitor_t *mon)
The
basic API is similar to the mutex API. However, the real potential of monitors
as IPC devices lies in the use of condition variables.
typedef struct
{
Monitor_t * monitor;
TaskQueue_t
* queue;
}
Condition_t;
Condition
variables are associated to monitors. They hold a task queue where tasks may
sleep while waiting for some condition to fulfill, and a pointer to the
associated monitor. Condition variables are created and destroyed by:
Condition_t *CreateCondition(char
*name, Monitor_t *mon)
void DeleteCondition(Condition_t *cond)
Tasks
waiting for a condition to be fulfilled execute one of the following:
bool WaitCondition(Condition_t *cond)
bool WaitConditionTimed(Condition_t
*cond, Time_t msecs)
and
tasks wishing to wake up the sleeping tasks execute:
bool SignalCondition(Condition_t *cond)
As
has been said before, a limitation of task queues as an IPC mechanism lies in
the fact that events signalled before tasks wait for them are lost. The WaitCondition()
primitive avoids this by atomically doing the following:
1.
Leave the monitor (giving another task the opportunity
to enter it and signal the condition).
2.
Sleep in the monitors´s task queue
3.
Re-enter the monitor
As
tasks must enter the monitor either to wait on a condition or to signal it, critical
races between verifying, waiting and signalling are avoided.
The monitor primitives may not be called by
interrupt handlers.
Pipes
The pipe API is defined in pipe.h
Pipes
allow unidireccional flow of data between tasks. This information is treated by
the pipe as a mere stream of bytes. Tasks putting bytes in the writing end of
the pipe may be considered producers, and tasks getting the bytes from
the reading end may be considered consumers. This implementation of
pipes supports multiple producers and consumers operating on the pipe
simultaneously. Pipes are also called FIFOs (first-in first-out
devices).
The pipe has an internal buffer that holds all
bytes produces but not yet consumed. The size of this buffer may be chosen
arbitrarily, since the pipe guarantees it will never overflow. Tasks trying to
write to a full pipe and tasks trying to read from an empty pipe will block. A
small buffer provides tight coupling between producers and consumers,
since as the buffer fills very quickly, they are forced to run in lockstep. A
big buffer provides loose coupling, a producer might even produce its
whole output and leave it in the pipe’s buffer before a consumer reads the
first byte.
The implementation is based in the following
object:
typedef struct
{
Mutex_t * mutex_get;
Mutex_t * mutex_put;
Monitor_t * monitor;
Condition_t * cond_get;
Condition_t * cond_put;
unsigned size;
unsigned avail;
char
* buf;
char
* head;
char
* tail;
char
* end;
}
Pipe_t
Buf is a pointer to the dynamically allocated internal buffer, head and
tail are pointers for implementing the classic circular buffer algorithm;
end merely points to the last-plus-one byte of the buffer for fast
wraparound. Size is the size of the buffer, and avail the number
of bytes currently stored in it. As the pipe semantics regarding blocking and
awakening of tasks is rather complex, a monitor was chosen as the basic IPC
device since it offered a straightforward solution. Tasks trying to write to a
full pipe or trying to read from an empty pipe sleep on the two condition
variables, cond_put and cond_get. Finally, the two
optional mutexes mutex_put and mutex_get allow for mutual
exclusion between multiple producers and consumers, respectively, guaranteeing
that writes and reads are atomic. They need not be used in single producer or
single consumer situations.
Pipes
are created and destroyed by the following primitives:
Pipe_t *CreatePipe(char *name,
unsigned size, bool serialized_get, bool serialized_put)
void DeletePipe(Pipe_t *p)
The
size argument is the size of the internal buffer, and the two booleans
indicate whether the serializing mutexes will be used.
These
are the reading and writing primitives, which mimic quite closely the semantics
of Unix pipes:
unsigned GetPipe(Pipe_t *p, void
*data, unsigned size, bool get_all)
unsigned GetPipeTimed(Pipe_t *p, void
*data, unsigned size, Time_t msecs, bool get_all)
unsigned GetPipeCond(Pipe_t *p, void
*data, unsigned size)
unsigned PutPipe(Pipe_t *p, void
*data, unsigned size)
unsigned PutPipeTimed(Pipe_t *p,
void *data, unsigned size, Time_t msecs)
unsigned PutPipeCond(Pipe_t *p, void
*data, unsigned size)
PutPipe...() ad GetPipe...() allow writing to and reading from a pipe. All of
them return the number of bytes written or read.
PutPipe() writes all given bytes (size), in so doing, it may block one or
more times.
PutPipeTimed() behaves like PutPipe(), but it may return prematurely because of
timeout.
PutPipeCond() will write as many of the given bytes as possible and will never block.
If
the get_all argument is true, GetPipe() and GetPipeTimed()
will behave as exact mirrors of PutPipe() and PutPipeTimed(). If get_all
is false, however, they will only block if the pipe is empty, and
will return as soon as it has something to be read, even if the number of bytes
is less than size. This is the most frequently required semantics, since
usually size is just an upper limit in order not to overflow the reading
buffer. This is also the semantics of Unix reads from a pipe. GetPipeCond()
will never block. The following function tells the number of bytes currently
stored in the pipe:
unsigned AvailPipe(Pipe_t *p)
Pipe primitives may not be called from
interrupt handlers.
Message queues
The message queue API is defined in msgqueue.h.
The message queue is a FIFO (first-in
first-out device) holding fixed size messages. The implementation is a
straightforward circular buffer algorithm with two semaphores (one for occupied
message slots, another one for free message slots):
typedef struct
{
Mutex_t
* mutex_get;
Mutex_t
* mutex_put;
Semaphore_t *
sem_get;
Semaphore_t *
sem_put;
unsigned msg_size;
char
* buf;
char
* head;
char
* tail;
char * end;
}
MsgQueue_t
Buf points to the dynamically allocated buffer. Its size equals the product
of the maximum number of messages in the queue and the message size (msg_size).
Head and tail are pointers for implementing the circular buffer
algorithm; end points to the last-plus-one byte in the buffer for fast
wraparound. The two semaphores sem_get and sem_put are
inititalized to zero and the maximum number of messages, respectively. The two
optional mutexes, mutex_get and mutex_put, need only be used if
multiple message consumers or producers are present. Message queues are created
and destroyed by:
MsgQueue_t *CreateMsgQueue(char *name, unsigned msg_max, unsigned msg_size,
bool serialized_get, bool serialized_put)
void DeleteMsgQueue(MsgQueue_t *mq)
The
msg_max parameter is the queue capacity (maximum number of messages), msg_size
is the size of each message, and the two booleans indicate whether more that
one consumer or producer will be using the queue at a time.
These
are the queue handling primitives:
bool GetMsgQueue(MsgQueue_t *mq, void *msg)
bool GetMsgQueueCond(MsgQueue_t *mq, void *msg)
bool GetMsgQueueTimed(MsgQueue_t *mq, void *msg, Time_t msecs)
bool PutMsgQueue(MsgQueue_t *mq, void *msg)
bool PutMsgQueueCond(MsgQueue_t *mq, void *msg)
bool PutMsgQueueTimed(MsgQueue_t *mq, void *msg, Time_t msecs)
unsigned
AvailMsgQueue(MsgQueue_t *mq)
The Get...()
and Put...() primitives read one message from the queue or write one
into it, respectively. The return status is just success or failure. The
conditional versions will never block; the unconditional ones will block when
trying to read a message from an empty queue or trying to write a message to a
full queue; the timed versions may return prematurely because of timeout. The AvailMsgQueue()
function tells how many unread messages are currently stored in the queue.
The conditional versions of Get...() and
Put...() may be called from interrupt handlers, but you must be careful
to create the queue without a mutex in the end that will be used by the
interrupt, i.e., specify false for the serialized_get or serialized_put
parameter of CreateMsgQueue().