Christof Meerwald@spezanw.university.edu

home
> study
>> spezanw

Spezielle Anwendersprachen - Threads++, Teil 3

Teil 1, Teil 2, Teil 3

1 kleine Erweiterung für Threads++

Obwohl ich selbst eigentlich kein Fan von Java bin, gefällt mir das in Java eingesetzte Prinzip, alle Threads von einer Klasse Thread abzuleiten, recht gut.

Eine von RunnableThread abgeleitete Klasse startet beim Aufruf des Konstruktors einen neuen Thread, der die Methode run ausführt.

// Runnable.h
class Runnable
{
  friend class RunnableThread;

 protected:
  /// run the object
  virtual void *run() = 0;
};


// RunnableThread.h
class RunnableThread: public Thread, public Runnable
{
 public:
  /// Create the thread by calling the run method.
  RunnableThread();

 private:
  /// wrapper for calling the run method.
  static void *runWrapper(void *arg);
};


// RunnableThread.cc
RunnableThread::RunnableThread()
  : Thread(runWrapper, (void *) static_cast<Runnable *>(this))
{
}

void *RunnableThread::runWrapper(void *arg)
{
  return ((Runnable *) arg)->run();
}

2 Beispiel Message Transmission System

Das Message Transmission System besteht aus einem Communication Center und einem Teil zum Übertragen der Nachrichten. Das Communication Center akzeptiert Nachrichten (wobei eine Bestätigung für die Nachricht zurückgegeben wird), speichert diese intern ab und leitet sie an den Teil, der die Nachrichten überträgt, weiter. Mit der Bestätigung kann später die Nachricht und somit auch der Status der Übertragung der Nachricht abgefragt werden.

Beim Design des Message Transmission Systems wurde ein etwas anderer Weg eingeschlagen, als in den Unterlagen vorgeschlagen. Neben kleineren Klassen wie Message und Receipt verrichten die Klassen CommunicationCenter und Transmitter den Großteil der Arbeit.

Message Transmission System Overview

Die Klasse CommunicationCenter stellt die Schnittstelle des Message Transmission Systems zur Außenwelt über die Methoden deliver und pickUp dar. Die Methoden können konkurrent von mehreren Threads aufgerufen werden und kümmern sich um den wechselseitigen Ausschluss beim Zugriff auf die internen Datenstrukturen.

Um die Implementierung des Programms möglichst einfach zu halten, wurde versucht, auf möglichst viele Sprachmittel und vorgefertigte Bibliotheken von C++ zurückzugreifen. So wird für das Speichern der zu verarbeitenden Nachrichten auf template-Klassen der STL (Standard Template Library) zurückgegriffen:

map<Receipt, Message *> storedMsgs;
legt ein assoziatives Array an, wobei als Schlüssel der Typ Receipt und für die Daten der Typ Message * (also ein Zeiger auf Message) verwendet wird.
queue<Message *> deliverMsgs;
eine Queue vom Typ Message *

Im Gegensatz zu Java sind solche Container-Klassen nicht von vornherein thread-safe, sodass man sich da selbst um einen wechselseitigen Ausschluss beim Zugriff kümmern muss. Auch der Zugriff auf die Daten ist vielleicht auf den ersten Blick etwas eigenartig gelöst, wie zB in CommunicationCenter::getMessageForTransmission zu sehen ist.

// CommunicationCenter.h
class CommunicationCenter
{
 public:
  CommunicationCenter(int nrTransmitters = 5, int nrMessages = 1000);

  Receipt &deliver(Message &msg);

  Message &pickUp(Receipt &rcpt);

  Message &getMessageForTransmission();

 private:
  int nrTransmitters;
  int nrMessages;

  Transmitter *trans;

  map<Receipt, Message *> storedMsgs;
  queue<Message *> deliverMsgs;
  Mutex deliverMutex, storedMutex;
};


// CommunicationCenter.cc
Receipt &CommunicationCenter::deliver(Message &msg)
{
  // check if it is really a new message
  if (msg.getStatus() != Message::NewMessage)
  {
    throw duplicate_message();
  }
  else
  {
    Lock storedLock(storedMutex);

    // check if we have some space left for the message
    if (storedMsgs.size() < nrMessages)
    {
      // generate a receipt
      Receipt *rcpt = new Receipt();

      // store a reference to the message
      storedMsgs[*rcpt] = &msg;
      {
	// put the message in the deliver queue
	Lock deliverLock(deliverMutex);
	deliverMsgs.push(&msg);
      }
      // try to wake up a output processor for delivering the message
      Transmitter::newMsgNotify();

      // return the receipt
      return *rcpt;
    }
    else
    {
      throw communication_center_is_full();
    }
  }
}

Message &CommunicationCenter::pickUp(Receipt &rcpt)
{
  Lock storedLock(storedMutex);

  // find the message using the receipt
  map<Receipt, Message *>::iterator p = storedMsgs.find(rcpt);

  // check if we have found the message
  if (p != storedMsgs.end())
  {
    Message &msg = *p->second;

    // check the message status
    if (msg.getStatus() == Message::NewMessage)
    {
      throw message_is_awaiting_transmission();
    }
    else
    {
      // delete the message if it has been processed by an output
      // processor
      storedMsgs.erase(p);
    }

    // return the message
    return msg;
  }
  else
  {
    throw invalid_receipt();
  }
}

Message &CommunicationCenter::getMessageForTransmission()
{
  Lock deliverLock(deliverMutex);

  // check if there are messages in the transmission queue
  if (!deliverMsgs.empty())
  {
    // get the first message and remove it from the queue
    Message &msg = *deliverMsgs.front();
    deliverMsgs.pop();

    return msg;
  }
  else
  {
    throw underflow_error("no message in queue");
  }
}

Die Klasse Transmitter ist von RunnableThread abgeleitet, womit für jeden Transmitter ein eigener Thread gestartet wird. Ein Transmitter holt sich aus einer Queue eine Referenz auf die nächste zu sendende Nachricht (CommunicationCenter::getMessageForTransmission). Wurde die Übertragung der Nachricht abgeschlossen, wird der Status der Nachricht aktualisiert.

// Transmitter.h
class Transmitter: private RunnableThread
{
 public:
  Transmitter();

  static void newMsgNotify();

  static void setCommCenter(CommunicationCenter *commCenter);

 protected:
  void *run();

 private:
  static CommunicationCenter *commCenter;

  static Mutex newMsgMutex;
  static Condition newMsgCond;

  static int nrWaiting;
};


// Transmitter.cc
void *Transmitter::run()
{
  Message *msg;

  while (true)
  {
    try
    {
      // try to get message from communication center
      msg = &commCenter->getMessageForTransmission();
    }
    catch (underflow_error &e)
    {
      // wait for message notification
      Lock newMsgLock(newMsgMutex);

      nrWaiting++;
      newMsgCond.wait();
      nrWaiting--;

      continue;
    }

    // transmit message...
    sleep(2);

    // set message status to delivered
    msg->setStatus(Message::HasBeenSent);
  }
}

void Transmitter::newMsgNotify()
{
  Lock newMsgLock(newMsgMutex);
  newMsgCond.signal();
}

2.1 Source-Code

Hier gibt's den Source-Code dieses Beispiels.


This Web page is licensed under the Creative Commons Attribution - NonCommercial - Share Alike License. Any use is subject to the Privacy Policy.

Revision: 1.10, cmeerw.org/study/spezanw/
Last modified: Mon Sep 03 18:20:50 2018
Christof Meerwald <cmeerw@cmeerw.org>
XMPP: cmeerw@cmeerw.org