MTask

Preemptive multitasking executive for protected mode DOS

 

(Digital Mars version 2.01)

 

Introduction

 

MTask is a fully preemptive multitasking executive for PCs with a single 80386 or higher CPU. It supports fixed priorities and time slicing. This version is written for the Digital Mars C compiler with the X32 DOS extender, and generates protected mode executables that run on plain DOS. Programs using MTask should be compiled with the –mx memory model flag and linked with mtask.lib and x32.lib (the version of X32 that does not use virtual memory – see the limitations below).

 

Limitations

 

This implementation is intended to run on plain DOS without using virtual memory, since it is basically aimed at embedded systems. For this reason, no memory locking is performed in the interrupt handling sections of the code. It will also run in non-preemptive mode in Windows 95 and 98 DOS boxes; due to some nasty details in the handling of protected mode hardware interrupts, preemptions do not work in these environments.

 

Preemptions are not enabled for interrupts originating in real mode or X32 code. This limitation is imposed by X32. DOS blocking calls, for example, should be avoided since the task cannot be preempted in real mode (they should be avoided anyway since they do busy waiting and consume CPU unnecessarily).

 

Licensing terms

 

This software is public domain. You may use it for whatever you like.

 

Contacting the author

 

Please send any comments, criticism, improvements or bug reports to:

 

Hugo Etchegoyen

Buenos Aires, Argentina

hee@fibertel.com.ar

hetchegoyen@hasar.com

 


The MTask library

 

The MTask library contains:

 

1.      The multitasking kernel proper, whose API is defined in mtask.h:

 

·        Kernel.obj – Task management, scheduling, memory management, task queue primitives, inter-task message passing, context switching, timing.

·        Queue.obj – Task queues.

·        Irq.obj – Hardware interrupts.

·        Math.obj – Saving and restoring coprocessor status.

 

2.      Additional modules. Each one has a header file defining the respective API.

 

·        Sem.obj (sem.h) – Semaphores.

·        Mutex.obj (mutex.h) – Mutexes (mutual exclusion objects).

·        Monitor.obj (monitor.h) – Monitors and condition variables.

·        Pipe.obj (pipe.h) – Pipes.

·        Msgqueue.obj (msgqueue.h) – Fixed size message queues.

 

Tasks

 

A task is an execution thread. Each task has:

 

1.      A function body.

2.      A stack which is dynamically allocated when the task is created.

3.      A context, basically the register set and the math coprocessor status, if available.

4.      A control block, which holds the task’s stack pointer, some context variables and some other information used for message passing and synchronization.

 

A task’s body or function is of the type:

 

void task(void *arg)

 

When created, each task is allocated a stack and passed a single arg argument. There is no limitation to the number of tasks that can be created (except available memory). Several tasks can share the same function body. A task ends by returning from its function body or by executing the Exit() primitive (note the capital E). Executing the standard C library exit() function, or returning from main(), will end the whole program with all its tasks.

 

As with any other multithreading system, global data are shared by all tasks. It is the programmer’s responsibility to serialize the use of shared resources. These include the operating system (DOS), static and global variables and non-reentrant functions, either in the application or in libraries, including the standard C library.

 


Task states

 

Tasks can be in any of the following states:

 

·        Suspended

·        Ready

·        Running

·        Delaying

·        Waiting (in a task queue)

·        Sending (a message)

·        Receiving (a message)

·        Terminated

 

Tasks are created in the suspended state. In order to start the task running, it must be put in the ready state by calling Ready().

 

This being a single-processor system, only one task may be assigned the CPU at a time. This task is called the current task, and it is in the running state. If all the user tasks are blocked (none of them can run), MTask runs an internal task that does nothing and never blocks, the null task. The null task has minimum priority and will relinquish the CPU as soon as some other task is ready.

 

Within MTask there is a function (the scheduler) that chooses which of the currently ready to run tasks should be assigned the CPU. If the scheduler finds that the current task must be changed, it performs a context switch operation. Context switching is performed by saving the register context in the current task’s stack, saving the current stack pointer and additional context information in the current task’s control block, and restoring the stack and context from the new current task.

 

Tasks in the delaying state are waiting for some period of time to elapse. Tasks waiting in a task queue are blocked until some other task (or an interrupt) unblocks them and puts them again in the ready state. Tasks in the sending or receiving states are blocked until some other task performs the complementary operation (receiving or sending, respectively). A message is then transferred and the previously blocked task is made ready again.

 

When a task is terminated its resources are freed. However, this cannot be done immediately when a task terminates itself, either by executing Exit() or by returning from its function body. In this case the task enters the terminated state, and its resources will be freed later.

 

Scheduling

 

MTask is  preemptive, with fixed priorities and time slicing.

 

Preemptive scheduling means any task may loose the CPU as a consequence of a hardware interrupt. Preemptions may be disabled on a task by task basis, by default all tasks start running in preemptive mode. A task may put itself in non-preemptive (or cooperative) mode. This may be useful for executing some code sections atomically. When a task is running in cooperative mode, it will only relinquish the CPU by executing a blocking primitive. Note that this mechanism protects the task from losing the CPU as a result of an interrupt, but does not protect it against the execution of interrupt code. If a task needs to protect itself from the execution of all interrupts or some specific ones, it must disable them by using the available primitives. Disabling interrupts is a mechanism frequently used within the MTask kernel ir order to serialize some critical sections. It is recommended, however, that applications refrain from using it unless it is absolutely necessary.

 

Tasks run with static or fixed priorities, which may be changed any time. Priorities will not be dynamically adjusted by the kernel, as is usually done in general-purpose operating systems like Unix. Fixed priorities are more adequate for dedicated systems, since they guarantee predictable behaviour.

 

The scheduler in MTask uses a very simple algorithm: it just assigns the CPU to the highest priority task among those ready to run. If two or more ready tasks share the highest priority, the CPU is assigned to them in round-robin way according to the time slice.

 

Task queues and time queue

 

Task queues are used by MTask to manage waiting, delaying and ready tasks. They are implemented as doubly-linked, sorted lists. For all queues, except the time queue, the sorting criterium is task priority. When a task is inserted in a priority-sorted queue, it will be placed immediately before any other task in the queue with the same or higher priority. This means that the last task in the queue will always be the one with the highest priority, or, among two or more with the same (highest) priority, the one that has been waiting for the longest.

 

All ready tasks wait for the CPU in one such queue, the ready queue. The scheduler assigns the CPU to the last task in this queue.

 

The terminated queue is used for holding tasks that have terminated themselves, whose resources have not yet been freed. Memory allocation functions flush this queue and free the associated memory.

 

The time queue is sorted by is increasing wakeup time. There is a variable called msecs in each task’s control block, which is used to count the milliseconds remaining until wakeup. This is the absolute remaining time for the first task in the time queue, for the rest, the msecs variable holds the difference in wakeup time from the previous task. The timer interrupt decrements the msecs variable of the first task in the time queue, when it reaches zero, the task is removed from the time queue and made ready. The second task comes now to the front of the queue; its msecs variable is now the absolute remaining time for wakeup, and so on.

 

Tasks are in the time queue when they are in the delaying state, or in the waiting, sending or receiving states in case the blocking primitive was called with a finite timeout.

 

At any given time a task may be found in no queue at all, in a task queue, in the time queue, or in both. These are the possible combinations, according to the task’s state:

 

·        No queue: states suspended, running, receiving (without timeout).

·        Ready queue: state ready to run.

·        Time queue: states delaying, receiving (with timeout)

·        Task queue: states waiting in task queue (without timeout), transmitting (without timeout).

·        Task queue and time queue: states waiting in task queue (with timeout), transmitting (with timeout).

·        Terminated queue: state terminated.

 

Main task and null task

 

The main task and the null task are created when MTask is initialized. The main task runs the program’s main code with default priority. It runs like any other task, but it has some special features. Since it has not been created with CreateTask(), it cannot be deleted with DeleteTask() and it cannot call Exit(), which is equivalent to DeleteTask(CurrentTask()). The main task can only terminate by calling the C library function exit() or returning from main(), and then the whole program terminates.

 

Constants

 

Some constants defined in mtask.h:

 

MIN_PRIO = 0            Priority of the null task

DEFAULT_PRIO = 50       Priority of the main task

MAX_PRIO = -1           Maximum priority

FOREVER = -1            Infinite timeout

 

MTask API

 

The API is defined in the header file mtask.h.

 

Object constructors (functions starting with Create...) accept a name string for the object. This name is not used by the kernel, but it might be useful for debugging. A dynamic copy of the string is stored in the object. Pass NULL if you are not interested in giving the object a name.

 

MTask initialization

 

void InitMTask(Time_t mspertick, Time_t timeslice)

 

Initializes MTask’s kernel and creates the main and null tasks. It must be called before any other MTask function.

 

MTask uses the real time interrupt (IRQ 0) as a base for time measurements. In a standard PC running DOS, this interrupt is triggered once every 55 milliseconds. The mspertick argument, if different from zero, reprograms the real-time counter and sets this interval to any number of milliseconds from 1 to 55. A high timer frequency (maximum 1 kHz when mspertick == 1) provides a finer resolution in the measurement of delay and timeout intervals. The default timer interval (55 milliseconds) is restored upon program termination.

 

The timeslice argument, if different from zero, establishes the number of milliseconds that a task may run before being preempted when another task of the same priority is ready. The default value is two default timer ticks, or 110 milliseconds.

 

Task creation and destruction

 

Task_t *CreateTask(TaskFunc_t func, unsigned stacksize, void *arg, char *name, Prio_t priority)

Task_t *CurrentTask(void)

void DeleteTask(Task_t *task)

 

CreateTask() creates a new task using the func body, which receives arg as its argument. The initial state is TaskSuspended. It allocates a stack of stacksize bytes. To run the task, call Ready().

 

CurrentTask() returns a pointer to the calling task.

 

DeleteTask() destroys a task. This function may not be applied to the main task.

 

Math context

 

void ProtectMath(Task_t *task)

 

Tasks using the 80387 arithmetic coprocessor (if more than one) should call this function to enable saving and restoring the coprocessor status during context switch. Digital Mars math emulation is reentrant, so this protection is only necessary if a coprocessor is used.

 

Priorities

 

Prio_t GetPriority(Task_t *task)

void SetPriority(Task_t *task, Prio_t priority)

 

If you change the priority of a task that is currently in a priority-sorted queue (i.e. any queue except the time queue), the task is removed from the queue and then re-inserted in it in order to reflect its new priority. If you change the priority of either the current task or one of the ready tasks the scheduler is called, since the new priority setting may dictate the need to reassign the CPU.

 

State changes

 

void Suspend(Task_t *task)

void Ready(Task_t *task)

 

Suspend() blocks a task without placing it in any queue. The only way to make a suspended task  ready again is calling Ready().

 

Ready() makes a task ready to run, no matter its previous state. If the task was blocked as a result of calling WaitQueue(), Send() or Receive(), these functions will return false indicating failure.

 

Task queues

 

TaskQueue_t *CreateQueue(char *name)

void DeleteQueue(TaskQueue_t *queue)

bool WaitQueue(TaskQueue_t *queue)

bool WaitQueueTimed(TaskQueue_t *queue, Time_t msecs)

bool SignalQueue(TaskQueue_t *queue)

void FlushQueue(TaskQueue_t *queue, bool success)

 

Task queues offer an IPC mechanism similar to the sleep() and wakeup() primitives of the Unix kernel. Tasks can use the queues to wait for events; signalling the queue generates the event and wakes up the waiting task. A basic limitation of this mechanism is that task queues have no way of remembering events that are signalled before some task waits for them, these events are lost. They are very useful, however, as a basic building block to construct higher-level IPC primitives. For example, a semaphore can be easily built using a task queue and a counter to remember signalled,  but not yet consumed events.

 

CreateQueue() creates a task queue; DeleteQueue() destroys it. If there are tasks waiting in the queue when it is destroyed, their waiting operations fail.

 

The WaitQueue...() functions are for waiting in a queue; the task enters the TaskWaiting state. True as return status indicates success, false means failure. Failure may be due to timeout, Ready() or queue flushing.

 

SignalQueue() wakes up the highest priority (last) task in the queue, whose WaitQueue...() operation completes successfully. This function returns true if a task was awakened, and false if the queue was empty (i.e. the signalled event was lost).

 

FlushQueue() wakes up all tasks in the queue, which  complete their respective WaitQueue...() operations with the given status.

 

Message passing

 

bool Send(Task_t *to, void *msg, unsigned size)

bool SendCond(Task_t *to, void *msg, unsigned size)

bool SendTimed(Task_t *to, void *msg, unsigned size, Time_t msecs)

bool Receive(Task_t **from, void *msg, unsigned *size)

bool ReceiveCond(Task_t **from, void *msg, unsigned *size)

bool ReceiveTimed(Task_t **from, void *msg, unsigned *size, Time_t msecs)

 

MTask’s message passing uses the so-called rendezvous system, which gives the highest possible level of coupling between sender and receiver. The message is transferred directly between the participating tasks without intermediate buffering. The first task that calls a messaging primitive (send or receive) blocks until its peer performs the matching operation.

 

Sending or receiving a message bigger than the receiver’s buffer will produce a fatal error.

 

Receive() gets as its first argument (from) a pointer to a pointer to a task. This may be used to select a sender to receive messages from, or to get a pointer to the sender once the message is received. The possible scenarios are as follows:

 

From  == NULL: receive a message from any task

From != NULL, *from == NULL: receive a message from any task, once received, *from = pointer to sender.

From != NULL, *from == pointer to sender: receive a message from given sender only.

 

In order to receive the message contents, a pointer to a suitable buffer (msg) is passed. If you are not interested in the message itself, msg can be NULL. In this case the messaging system may be used just for task synchronization. If msg is not NULL, the size argument points to an integer variable where the size of the buffer is initially stored. Upon reception of the message, this variable will hold the real message size. Messages bigger than the buffer generate a fatal error.

 

ReceiveCond() and ReceiveTimed() are the conditional (non-blocking) and timeout versions of Receive().

 

Messages are transmitted using Send() or its conditional or timeout versions, SendCond() and SendTimed(). This functions may be used just for synchronization purposes by passing NULL for the msg pointer. In the blocking calls, if the receiver is not waiting for a message from the sending task or from any, the sender is blocked in state TaskSending and waits in a task queue associated to the receiver.

 

Current task control

 

These functions control the state of the calling task:

 

void Pause(void)

void Scheduler(void)

void Yield(void)

void Delay(Time_t msecs)

void DelayUntil(Time_t deadline)

void Exit(void)

 

Pause() suspends the current task – it is equivalent to Suspend(CurrentTask()).

 

Scheduler() calls MTask’s scheduler, giving it an opportunity to reassign the CPU. It is used mostly by tasks running in cooperative mode, and it is placed in the stack when returning from a first-level interrupt to take care of preemptions. This function will relinquish the CPU if some other task of higher priority is ready, or if another task of the same priority is ready and the current task’s time slice is exhausted.

 

Yield() is like Scheduler(), but will reassign the CPU to any ready task of the same or higher priority, even if the time slice is not yet exhausted.

 

Delay() waits for a given number of milliseconds.

 

DelayUntil() waits until some time in the future. If the deadline has already expired, it does not block. This function gives better accuracy than Delay() for periodic tasks.

 

Exit() terminates the calling task, and is equivalent to DeleteTask(CurrentTask()). This function is automatically called when a task returns from its function body. It cannot be called from the main task.

 

Non-preemptive (cooperative) mode

 

void Atomic(void)

void Unatomic(void)

 

Atomic() increments the atomic_level field of the current task’s control block. When this field has a value greater than zero, the task runs in cooperative mode. Unatomic() decrements the atomic_level field; when it reaches zero, the scheduler is called to make up for possibly delayed context switches, and the task runs in preemptive mode thereafter.

 

These functions may be nested, which facilitates their use in libraries.

 

Hardware interrupts

 

typedef bool (*Handler_t)(unsigned irq)

 

void SetHandler(unsigned irq, Handler_t handler)

void EndInterrupt()

void DisableIRQ(unsigned irq)

void EnableIRQ(unsigned irq)

 

MTask traps hardware interrupts on demand. The MTask kernel traps the real time interrupt (IRQ 0) internally, all other IRQs may be trapped by the application. The interrupt module handles switching to internal stacks, supports nested interrupts and tracks the interrupt level in order to allow for preemption when returning from first-level interrupts occurring on the application code.

 

User interrupt handlers must be installed using SetHandler(). The handler is a standard C function which receives the IRQ number as an argument and must return true if the interrupt is handled completely, or false to indicate that control should be chained to the previous handler. If the interrupt is handled completely, the handler should call EndInterrupt() before returning; this sends the EOI command to the PICs. If you want to un-trap an IRQ and restore the original vector, call SetHandler(irq, NULL). Trying to trap IRQ 0 causes Panic(). If you need to execute some application code on each timer interrupt, use SetTimerCallback() instead, see below.

 

Individual hardware interrupts may be enabled or disabled by calling EnableIRQ() and DisableIRQ().

 

Upon program termination, the original vectors and PIC masks are restored.

 

Heap functions

 

void *Malloc(unsigned size)

void *Realloc(void *mem, unsigned size)

char *StrDup(char *str)

void Free(void *mem)

 

These functions are thread-safe versions of their lowercase counterparts in the C library. They also have some additional features: memory allocated by Malloc() is filled with zeroes,  Realloc(), StrDup() and Free() tolerate null pointers. Before allocating memory, the terminated tasks queue is flushed and the associated memory returned to the heap. Malloc() will never return NULL; memory exhaustion will produce a fatal error. Realloc() and StrDup() may only return NULL if passed a NULL pointer argument.

 

Miscelaneous functions

 

Time_t GetTime(void)

 

This function returns the number of milliseconds elapsed since MTask was initialized. Its absolute value may not be very accurate, depending on the duration of the timer tick, because of rounding errors when the timer count value is calculated. It is not intended to be used for absolute time measurements, but rather differentially to measure time intervals and to calculate deadlines for DelayUntil() relative to the current time.

 

void SetData(Task_t *task, void *data)

 

A convenient way of assigning a task a pointer to private data. Not used by the kernel.

 

typedef void (*Switcher_t)(Task_t *save, Task_t *restore)

 

void SetSwitcher(Switcher_t switcher)

 

MTask takes care of saving and restoring the register and coprocessor context of tasks. Users may want to save and restore other contexts; this is accomplished by installing a suitable switcher function. This switcher is called immediately before MTask’s own context switch; it receives pointers to the task which is about to relinquish the CPU, whose context must be saved, and to the next task to run, whose context must be restored. This function is called with interrupts disabled, and must not enable them.

 

typedef void (*Callback_t)(void)

 

void SetTimerCallback(Callback_t callback)

 

Sets a pointer to a function to be called on each timer interrupt. 

 

void Panic(char *msg)

 

Called in case of fatal errors. It prints an error message in standard error output and exits the program.

 

Disabling and enabling general interrupts

 

void DisableInts()

void RestoreInts()

 

MTask keeps a global interrupt disabling counter; interrupts are disabled when the count is greater than zero. As this counter is saved and restored with each context switch, the status of interrupts is private for each task, i.e., some tasks may run with interrupts disabled and some others with interrupts enabled. DisableInts() disables interrupts and increments the counter; RestoreInts() decrements the counter and enables interrupts when it reaches zero. These functions may be nested, which facilitates their use in libraries.

 

Calling the MTask API from interrupt handlers

 

Interrupt handlers may only call a small subset of MTask’s API,  namely, non-blocking functions that may wake up sleeping tasks:

 

·        Ready()

·        SignalQueue()

·        FlushQueue()

·        SendCond()

 

Note that only the conditional form of the message sending primitives may be used, since it is the only one guaranteed not to block. Interrupts can send messages only to tasks waiting for them; this situation is typical in all message-driven systems (see the case of Minix in Operating Systems – Design and Implementation by A. Tanenbaum).


Semaphores

 

The semaphore API is defined in sem.h.

 

typedef struct Semaphore_t

{

      unsigned          value;

      TaskQueue_t *     queue;

}

Semaphore_t

 

 

A semaphore is implemented by a task queue and a counter (the semaphore value) that takes care of previously signalled but not yet consumed events.

 

Semaphore_t *CreateSem(char *name, unsigned value)

void DeleteSem(Semaphore_t *sem)

 

The semaphore is created with an initial value.

 

bool WaitSem(Semaphore_t *sem)

bool WaitSemCond(Semaphore_t *sem)

bool WaitSemTimed(Semaphore_t *sem, Time_t msecs)

 

These functions implement the basic DOWN semaphore primitive in its three flavours: without timeout, with timeout and conditional. If the semaphore value is greater than zero, DOWN decrements the value and returns success. This operation consumes a previously signalled event. In the semaphore value is zero, DOWN either fails (in the conditional version) or blocks the calling task in the semaphore’s task queue.

 

void SignalSem(Semaphore_t *sem)

 

This function implements the basic UP semaphore primitive. It signals an event; if processes are waiting in the semaphore’s queue, it wakes up the one with the highest priority and makes it ready; this process will complete its DOWN operation successfully. If no process is awakened, the semaphore value is incremented to account for a not-yet-consumed event.

 

unsigned ValueSem(Semaphore_t *sem)

 

Tells the semaphore value, i.e., the number of previously signalled but not yet consumed events.

 

void FlushSem(Semaphore_t *sem, bool wait_ok)

 

This is like SignalSem(), but it wakes up all tasks waiting in the semaphore. The completion status of their DOWN operations is given as the second argument.

 

The following semaphore primitives may be safely called from interrupt handlers:

 

·        ValueSem()

·        SignalSem()

·        FlushSem()


Mutexes

 

The mutex API is defined in mutex.h

 

typedef struct Mutex_t

{

      unsigned          use_count;

      Task_t *          owner;

      Semaphore_t *     sem;

}

Mutex_t

 

Mutexes are recursive mutual exclusion objects. A semaphore with an initial value of 1 might be used to the same effect, but it would not allow recursive occupation of the critical zone, which is a useful feature in libraries. The mutex uses a mutual exclusion semaphore, buts adds some administration to allow recursive occupation. The use_count field tracks the nesting level of recursive occupations; the owner field indicates which task is currently in the critical zone. Initial values for an unoccupied mutex are 1 for the semaphore count, 0 for use_count and NULL for owner.

 

When a mutex is first occupied by a task, owner points to the occupying task and use_count is 1. Nested occupations increment the value of use_count.  The semaphore value of an occupied mutex is always 0. Tasks waiting to enter the mutex sleep in the semaphore´s queue.

 

Mutex_t *CreateMutex(char *name)

void DeleteMutex(Mutex_t *mut)

 

Create an initially free mutex and destroy it.

 

bool EnterMutex(Mutex_t *mut)

bool EnterMutexCond(Mutex_t *mut)

bool EnterMutexTimed(Mutex_t *mut, Time_t msecs)

 

Functions to occupy a mutex in three flavours: with and without timeout, and conditional. The first two may block if the mutex is owned by another task. Once occupied for the first time, each nested occupation by the same task merely increments the use_count.

 

void LeaveMutex(Mutex_t *mut)

 

Decrements the use_count and frees the mutex when this count reaches zero. If tasks are waiting to enter the mutex, the highest-priority one is then awakened. In order to free a mutex, this function must be called once for every successful occupation.

 

Mutex primitives may not be called from interrupt handlers.


Monitors

 

Monitors and condition variables are implemented in this module. The API is in monitor.h

 

typedef struct

{

      Task_t *          owner;

      Semaphore_t *     sem;

}

Monitor_t

 

Monitors are basically non-recursive mutexes. Compare a monitor´s structure with that of a mutex: the use_count field is missing.

 

Monitor_t *CreateMonitor(char *name)

void DeleteMonitor(Monitor_t *mon)

bool EnterMonitor(Monitor_t *mon)

bool EnterMonitorCond(Monitor_t *mon)

bool EnterMonitorTimed(Monitor_t *mon, Time_t msecs)

void LeaveMonitor(Monitor_t *mon)

 

The basic API is similar to the mutex API. However, the real potential of monitors as IPC devices lies in the use of condition variables.

 

typedef struct

{

      Monitor_t *       monitor;

      TaskQueue_t *     queue;

}

Condition_t;

 

Condition variables are associated to monitors. They hold a task queue where tasks may sleep while waiting for some condition to fulfill, and a pointer to the associated monitor. Condition variables are created and destroyed by:

 

Condition_t *CreateCondition(char *name, Monitor_t *mon)

void DeleteCondition(Condition_t *cond)

 

Tasks waiting for a condition to be fulfilled execute one of the following:

 

bool WaitCondition(Condition_t *cond)

bool WaitConditionTimed(Condition_t *cond, Time_t msecs)

 

and tasks wishing to wake up the sleeping tasks execute:

 

bool SignalCondition(Condition_t *cond)

 

As has been said before, a limitation of task queues as an IPC mechanism lies in the fact that events signalled before tasks wait for them are lost. The WaitCondition() primitive avoids this by  atomically doing the following:

 

1.      Leave the monitor (giving another task the opportunity to enter it and signal the condition).

2.      Sleep in the monitors´s task queue

3.      Re-enter the monitor

 

As tasks must enter the monitor either to wait on a condition or to signal it, critical races between verifying, waiting and signalling are avoided.

 

The monitor primitives may not be called by interrupt handlers.


Pipes

 

The pipe API is defined in pipe.h

 

Pipes allow unidireccional flow of data between tasks. This information is treated by the pipe as a mere stream of bytes. Tasks putting bytes in the writing end of the pipe may be considered producers, and tasks getting the bytes from the reading end may be considered consumers. This implementation of pipes supports multiple producers and consumers operating on the pipe simultaneously. Pipes are also called FIFOs (first-in first-out devices).

 

The pipe has an internal buffer that holds all bytes produces but not yet consumed. The size of this buffer may be chosen arbitrarily, since the pipe guarantees it will never overflow. Tasks trying to write to a full pipe and tasks trying to read from an empty pipe will block. A small buffer provides tight coupling between producers and consumers, since as the buffer fills very quickly, they are forced to run in lockstep. A big buffer provides loose coupling, a producer might even produce its whole output and leave it in the pipe’s buffer before a consumer reads the first byte.

 

The implementation is based in the following object:

 

 

typedef struct

{

      Mutex_t *         mutex_get;

      Mutex_t *         mutex_put;

      Monitor_t *       monitor;

      Condition_t *     cond_get;

      Condition_t *     cond_put;

      unsigned          size;

      unsigned          avail;

      char *            buf;

      char *            head;

      char *            tail;

      char *            end;

}

Pipe_t

 

Buf is a pointer to the dynamically allocated internal buffer, head and tail are pointers for implementing the classic circular buffer algorithm; end merely points to the last-plus-one byte of the buffer for fast wraparound. Size is the size of the buffer, and avail the number of bytes currently stored in it. As the pipe semantics regarding blocking and awakening of tasks is rather complex, a monitor was chosen as the basic IPC device since it offered a straightforward solution. Tasks trying to write to a full pipe or trying to read from an empty pipe sleep on the two condition variables, cond_put and cond_get. Finally, the two optional mutexes mutex_put and mutex_get allow for mutual exclusion between multiple producers and consumers, respectively, guaranteeing that writes and reads are atomic. They need not be used in single producer or single consumer situations.

 

Pipes are created and destroyed by the following primitives:

 

Pipe_t *CreatePipe(char *name, unsigned size, bool serialized_get, bool serialized_put)

void DeletePipe(Pipe_t *p)

 

The size argument is the size of the internal buffer, and the two booleans indicate whether the serializing mutexes will be used.

 

These are the reading and writing primitives, which mimic quite closely the semantics of Unix pipes:

 

unsigned GetPipe(Pipe_t *p, void *data, unsigned size, bool get_all)

unsigned GetPipeTimed(Pipe_t *p, void *data, unsigned size, Time_t msecs, bool get_all)

unsigned GetPipeCond(Pipe_t *p, void *data, unsigned size)

 

unsigned PutPipe(Pipe_t *p, void *data, unsigned size)

unsigned PutPipeTimed(Pipe_t *p, void *data, unsigned size, Time_t msecs)

unsigned PutPipeCond(Pipe_t *p, void *data, unsigned size)

 

PutPipe...() ad GetPipe...() allow writing to and reading from a pipe. All of them return the number of bytes written or read.

 

PutPipe() writes all given bytes (size), in so doing, it may block one or more times.

PutPipeTimed() behaves like PutPipe(), but it may return prematurely because of timeout.

PutPipeCond() will write as many of the given bytes as possible and will never block.

 

If the get_all argument is true, GetPipe() and GetPipeTimed() will behave as exact mirrors of PutPipe() and PutPipeTimed(). If get_all is false, however, they will only block if the pipe is empty, and will return as soon as it has something to be read, even if the number of bytes is less than size. This is the most frequently required semantics, since usually size is just an upper limit in order not to overflow the reading buffer. This is also the semantics of Unix reads from a pipe. GetPipeCond() will never block. The following function tells the number of bytes currently stored in the pipe:

 

unsigned AvailPipe(Pipe_t *p)

 

Pipe primitives may not be called from interrupt handlers.


Message queues

 

The message queue API is defined in msgqueue.h.

 

The message queue is a FIFO (first-in first-out device) holding fixed size messages. The implementation is a straightforward circular buffer algorithm with two semaphores (one for occupied message slots, another one for free message slots):

 

typedef struct

{

      Mutex_t *         mutex_get;

      Mutex_t *         mutex_put;

      Semaphore_t *     sem_get;

      Semaphore_t *     sem_put;

      unsigned          msg_size;

      char *            buf;

      char *            head;

      char *            tail;

      char *            end;

}

MsgQueue_t

 

Buf points to the dynamically allocated buffer. Its size equals the product of the maximum number of messages in the queue and the message size (msg_size). Head and tail are pointers for implementing the circular buffer algorithm; end points to the last-plus-one byte in the buffer for fast wraparound. The two semaphores sem_get and sem_put are inititalized to zero and the maximum number of messages, respectively. The two optional mutexes, mutex_get and mutex_put, need only be used if multiple message consumers or producers are present. Message queues are created and destroyed by:

 

MsgQueue_t *CreateMsgQueue(char *name, unsigned msg_max, unsigned msg_size, bool serialized_get, bool serialized_put)

void DeleteMsgQueue(MsgQueue_t *mq)

 

The msg_max parameter is the queue capacity (maximum number of messages), msg_size is the size of each message, and the two booleans indicate whether more that one consumer or producer will be using the queue at a time.

 

These are the queue handling primitives:

 

bool GetMsgQueue(MsgQueue_t *mq, void *msg)

bool GetMsgQueueCond(MsgQueue_t *mq, void *msg)

bool GetMsgQueueTimed(MsgQueue_t *mq, void *msg, Time_t msecs)

bool PutMsgQueue(MsgQueue_t *mq, void *msg)

bool PutMsgQueueCond(MsgQueue_t *mq, void *msg)

bool PutMsgQueueTimed(MsgQueue_t *mq, void *msg, Time_t msecs)

unsigned AvailMsgQueue(MsgQueue_t *mq)

 

The Get...() and Put...() primitives read one message from the queue or write one into it, respectively. The return status is just success or failure. The conditional versions will never block; the unconditional ones will block when trying to read a message from an empty queue or trying to write a message to a full queue; the timed versions may return prematurely because of timeout. The AvailMsgQueue() function tells how many unread messages are currently stored in the queue.

 

The conditional versions of Get...() and Put...() may be called from interrupt handlers, but you must be careful to create the queue without a mutex in the end that will be used by the interrupt, i.e., specify false for the serialized_get or serialized_put  parameter of CreateMsgQueue().