Un client m’a interrogé récemment sur la mise en œuvre d’une notification d’événements entre threads. Je l’ai tout naturellement aiguillé vers l’emploi d’une variable condition pthread_cond_t
. Toutefois lorsque les notifications se sont produites par rafales rapides, des problèmes se sont posés, que je n’ai pu résoudre qu’avec l’emploi d’un sémaphore supplémentaire.
Contexte
Un programme industriel reçoit des données en provenance d’un port de communication RS-485. Lorsqu’une trame complète est reçue, des opérations doivent être réalisées nécessitant des émissions de données, des attentes éventuelles, ceci en parallèle sur la réception de la trame suivante. J’ai donc proposé à mon client de concevoir cette fonctionnalité avec deux threads, l’un se chargeant de recevoir les données, puis de notifier le second afin qu’il traite la suite des opérations.
Pour assurer la synchronisation entre les deux, il m’a semblé judicieux d’utiliser une variable-condition.
Variable condition
Une variable-condition est simplement une donnée de type pthread_cond_t
partagée entre deux threads. J’ai pour habitude, lorsque je présente cette structure de synchronisation durant une session de formation, de la comparer à une cloche. Un thread peut s’endormir, passivement, en attente sur la cloche et un autre thread peut venir à tout moment le réveiller en donnant un coup de marteau sur la cloche.
Si aucun thread n’est attente au moment du coup de marteau, tant pis, cette notification est perdue.
C’est pour éviter cela, que l’on associe toujours une variable condition avec un mutex pthread_mutex_t
. Ainsi, on pourra garantir qu’au moment du coup de marteau, un thread sera systématiquement en attente d’un réveil.
Voyons un exemple d’utilisation. Le thread qui attend les notifications se présente généralement ainsi.
pthread_cond_t cnd = PTHREAD_COND_INITIALIZER; pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; int main(void) { // ... initialisation ... pthread_mutex_lock(& mtx); while (1) { pthread_cond_wait(& cnd, & mtx); // notification recue, traitement... // ... } pthread_mutex_unlock(& mtx); return EXIT_SUCCESS; }
Le second thread, qui doit notifier le premier travaille ainsi :
void * fonction_thread (void * arg) { while (1) { // Attendre les données externes // .... // Notifier le thread main() pthread_mutex_lock(& mtx); pthread_cond_signal(& cnd); pthread_mutex_unlock(& mtx); } }
Contrairement à ce que l’on pourrait croire au premier regard, il n’y a pas d’erreur dans le code ci-dessus, il y a bien deux pthread_mutex_lock()
invoqués dans les deux threads. Ce qu’il faut comprendre c’est que pthread_cond_wait()
contient plusieurs étapes successives :
- relâchement du mutex (comme avec
pthread_mutex_unlock()
) ; - mise en sommeil en attente de notification ;
- reprise du mutex (comme
pthread_mutex_lock()
).
L’astuce est que les deux premiers points sont atomiquement liés. Ainsi, il n’est pas possible qu’un thread invoque pthread_cond_signal()
sans que le premier ne soit véritablement en attente.
NB: On notera que pthread_cond_wait()
présente également deux autres particularités inattendues : la possibilité que le thread soit réveillé prématurément sans qu’une notification ne soit survenue, et le fait qu’il s’agisse d’un point d’annulation si pthread_cancel()
est invoqué dans un autre thread. Ceci dépasse le cadre de cet article et n’a pas d’importance pour le sujet traité ici.
Premier essai
Dans ce premier exemple, le thread principal crée un thread de notification puis se met en attente sur la variable-condition. Les notifications se produiront régulièrement toutes les secondes. Les deux threads décriront leurs progressions sur la sortie d’erreur.
exemple-condition-01.c: #include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <sys/time.h> pthread_cond_t cnd = PTHREAD_COND_INITIALIZER; pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; void * thread_notification (void * arg) { struct timeval tv; while (1) { sleep(1); pthread_mutex_lock(& mtx); gettimeofday(& tv, NULL); pthread_cond_signal(& cnd); pthread_mutex_unlock(& mtx); fprintf(stderr, "Notification envoyée à: %ld.%06ld\n", tv.tv_sec, tv.tv_usec); } return NULL; } int main (void) { pthread_t thr; struct timeval tv; pthread_mutex_lock(& mtx); if (pthread_create(& thr, NULL, thread_notification, NULL) != 0) exit(EXIT_FAILURE); while (1) { pthread_cond_wait(& cnd, & mtx); gettimeofday(& tv, NULL); fprintf(stderr, "Notification reçue à %ld.%06ld\n", tv.tv_sec, tv.tv_usec); } pthread_mutex_unlock(& mtx); return EXIT_SUCCESS; }
L’exécution semble correcte.
$ ./exemple-condition-01 Notification envoyée à: 1376546517.395413 Notification reçue à 1376546517.395442 Notification envoyée à: 1376546518.395572 Notification reçue à 1376546518.395599 Notification envoyée à: 1376546519.395732 Notification reçue à 1376546519.395761 Notification envoyée à: 1376546520.395891 Notification reçue à 1376546520.395920 Notification envoyée à: 1376546521.396049 Notification reçue à 1376546521.396077 Notification envoyée à: 1376546522.396267 Notification reçue à 1376546522.396295 Notification envoyée à: 1376546523.396430 Notification reçue à 1376546523.396460 Notification envoyée à: 1376546524.396594 Notification reçue à 1376546524.396624 ^C $
Rafales de notifications
Vérifions ce qui se produit si plusieurs notifications sont envoyées rapidement. Pour cela nous allons remplacer, dans le thread, la séquence
pthread_mutex_lock(& mtx); gettimeofday(& tv, NULL); pthread_cond_signal(& cnd); pthread_mutex_unlock(& mtx); fprintf(stderr, "Notification envoyée à: %ld.%06ld\n", tv.tv_sec, tv.tv_usec);
par
pthread_mutex_lock(& mtx); gettimeofday(& tv, NULL); pthread_cond_signal(& cnd); pthread_mutex_unlock(& mtx); pthread_mutex_lock(& mtx); pthread_cond_signal(& cnd); pthread_mutex_unlock(& mtx); pthread_mutex_lock(& mtx); pthread_cond_signal(& cnd); pthread_mutex_unlock(& mtx); fprintf(stderr, "Trois notifications envoyées à: %ld.%06ld\n", tv.tv_sec, tv.tv_usec);
L’exécution est beaucoup moins bonne qu’auparavant.
$ ./exemple-condition-02 Trois notifications envoyées à: 1376546967.599002 Notification reçue à 1376546967.599041 Trois notifications envoyées à: 1376546968.599216 Notification reçue à 1376546968.599246 Trois notifications envoyées à: 1376546969.599377 Notification reçue à 1376546969.599406 Trois notifications envoyées à: 1376546970.599540 Notification reçue à 1376546970.599567 ^C $
Bien que nous envoyions trois notifications à chaque fois, une seule d’entre elles est reçue par le thread principal.
Que se passe-t-il ?
- Le thread main est endormi sur l’appel
pthread_cond_wait()
. Il a relâché le mutex. - Le thread de notification prend le mutex.
- Le thread de notification invoque
pthread_cond_signal()
. Cela réveille le thread principal, qui essaye de reprendre le mutex. Celui-ci n’étant pas libre, le thread main se rendort en attente du mutex. - Le thread de notification lâche le mutex. Cet appel va réveiller le thread principal. Néanmoins ce dernier ne récupère pas instantanément le mutex, il faudra pour cela qu’il soit sélectionné par l’ordonnanceur et puisse s’exécuter.
- L’ordonnanceur n’ayant aucune raison de préempter le thread de notification au profit du thread principal, ce dernier reste en attente (dans l’état « Prêt ») alors que le premier peut continuer son exécution et reprendre à nouveau le mutex.
- Le thread de notification peut appeler une seconde fois
pthread_cond_signal()
. Puis la même séquence d’opérations se reproduit une seconde fois, et ce n’est que lorsque le thread de notification s’endort explicitement avecsleep()
que le thread principal est activé et peut sortir dupthread_cond_wait()
.
Trois notifications ont été envoyées. Une seule a été détectée. Ce n’est pas un comportement très fiable !
Amélioration
J’avais déjà exploré un problème assez similaire en octobre 2011 dans l’article Prise de mutex et priorités.
Nous avions vu alors que pour résoudre le problème de reprise de mutex, il était possible de s’appuyer sur l’appel système sched_yield()
qui représente un appel direct au scheduler en se plaçant volontairement dans une situation d’ordonnancement défavorable.
Nous allons donc tenter d’améliorer notre système en ajoutant deux choses.
- Une priorité temps réel pour le thread principal plus élevée que la priorité temps partagé du thread de notification ;
- Des appels systématiques
sched_yield()
après lepthread_mutex_unlock()
qui suit lepthread_cond_signal()
.
exemple-condition-03.c #include <pthread.h> #include <sched.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <sys/time.h> pthread_cond_t cnd = PTHREAD_COND_INITIALIZER; pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; void * thread_notification (void * arg) { struct timeval tv; while (1) { sleep(1); pthread_mutex_lock(& mtx); gettimeofday(& tv, NULL); pthread_cond_signal(& cnd); pthread_mutex_unlock(& mtx); sched_yield(); pthread_mutex_lock(& mtx); pthread_cond_signal(& cnd); pthread_mutex_unlock(& mtx); sched_yield(); pthread_mutex_lock(& mtx); pthread_cond_signal(& cnd); pthread_mutex_unlock(& mtx); sched_yield(); fprintf(stderr, "Trois notifications envoyées à: %ld.%06ld\n", tv.tv_sec, tv.tv_usec); } return NULL; } int main (void) { pthread_t thr; struct timeval tv; struct sched_param param; pthread_mutex_lock(& mtx); if (pthread_create(& thr, NULL, thread_notification, NULL) != 0) exit(EXIT_FAILURE); param.sched_priority = 10; if (pthread_setschedparam(pthread_self(), SCHED_FIFO, & param) != 0) { perror("pthread_setschedparam"); exit(EXIT_FAILURE); } while (1) { pthread_cond_wait(& cnd, & mtx); gettimeofday(& tv, NULL); fprintf(stderr, "Notification reçue à %ld.%06ld\n", tv.tv_sec, tv.tv_usec); } pthread_mutex_unlock(& mtx); return EXIT_SUCCESS; }
Le premier inconvénient de ce programme, c’est la nécessité de l’exécuter avec les droits root afin qu’il puisse prendre un ordonnancement temps réel.
De plus, l’exécution n’est pas vraiment plus concluante que le précédent.
$ sudo ./exemple-condition-03 Trois notifications envoyées à: 1376554376.792935 Notification reçue à 1376554376.792975 Trois notifications envoyées à: 1376554377.793192 Notification reçue à 1376554377.793307 Trois notifications envoyées à: 1376554378.793395 Notification reçue à 1376554378.793428 Trois notifications envoyées à: 1376554379.793623 Notification reçue à 1376554379.793654 Trois notifications envoyées à: 1376554380.793855 Notification reçue à 1376554380.793886 Trois notifications envoyées à: 1376554381.794084 Notification reçue à 1376554381.794117 Trois notifications envoyées à: 1376554382.794310 Notification reçue à 1376554382.794350 Trois notifications envoyées à: 1376554383.794489 Notification reçue à 1376554383.794519 Trois notifications envoyées à: 1376554384.794717 Notification reçue à 1376554384.794750
Aucune des triples notifications n’a été reçue en plus d’un exemplaire.
Pourtant, si on laisse le programme tourner un moment, surtout si la charge système augmente un peu, voici ce que l’on constate.
Trois notifications envoyées à: 1376554386.795122 Notification reçue à 1376554387.795484 Notification reçue à 1376554387.795570 Notification reçue à 1376554387.795632 Trois notifications envoyées à: 1376554387.795452 Trois notifications envoyées à: 1376554388.795782 Notification reçue à 1376554388.795809 Notification reçue à 1376554389.795982 Trois notifications envoyées à: 1376554389.795960 Notification reçue à 1376554390.796222 Trois notifications envoyées à: 1376554390.796203 Notification reçue à 1376554391.796478 Notification reçue à 1376554391.796548 Trois notifications envoyées à: 1376554391.796461 Notification reçue à 1376554392.796768 Trois notifications envoyées à: 1376554392.796748 Notification reçue à 1376554393.797030 Trois notifications envoyées à: 1376554393.797011 Notification reçue à 1376554394.797294 Trois notifications envoyées à: 1376554394.797275
Dans certaines circonstances, il arrive que les notifications se comportent comme nous l’attendions. Pourtant cela ne se produit que de temps à autres, de manière a priori imprévisible.
Ce genre de comportement apparement peu déterministe doit nous faire penser à des problèmes d’exécutions parallèles, plus particulièrement sur les systèmes multicœurs (ou multiprocesseurs).
L’exécution précédente se déroulait sur un petit processeur à deux cœurs. Essayons de forcer l’exécution du programme sur un seul CPU.
$ sudo taskset -c 0 ./exemple-condition-03 Notification reçue à 1376555394.075277 Notification reçue à 1376555394.075430 Notification reçue à 1376555394.075463 Trois notifications envoyées à: 1376555394.075235 Notification reçue à 1376555395.075686 Notification reçue à 1376555395.075776 Notification reçue à 1376555395.075832 Trois notifications envoyées à: 1376555395.075653 Notification reçue à 1376555396.076056 Notification reçue à 1376555396.076171 Notification reçue à 1376555396.076204 Trois notifications envoyées à: 1376555396.076025 Notification reçue à 1376555397.076408 Notification reçue à 1376555397.076497 Notification reçue à 1376555397.076527 Trois notifications envoyées à: 1376555397.076370 Notification reçue à 1376555398.076710 Notification reçue à 1376555398.076795 Notification reçue à 1376555398.076853 Trois notifications envoyées à: 1376555398.076676 Notification reçue à 1376555399.077044 Notification reçue à 1376555399.077135 Notification reçue à 1376555399.077166 Trois notifications envoyées à: 1376555399.077010
Le thread principal étant plus prioritaire, c’est lui qui affiche d’abord ses messages (« Notification reçue ») avant que le thread de notification affiche le sien, néanmoins nous pouvons vérifier à l’aide des horodatages que l’exécution est cohérente.
Dans la situation où notre système fonctionne sur un processeur unicœur, la solution sched_yield()
et priorité temps réel est suffisante pour assurer le bon déroulement du programme. Chaque fois que le thread de notification invoque sched_yield()
, l’ordonnanceur laisse l’exécution au thread principal qui peut obtenir le mutex et traiter la notification.
Toutefois ceci ne fonctionne donc pas sur un processeur multicœur si les deux threads tournent sur deux CPU différents (cas habituel lorsque le système n’est pas trop chargé) : lorsque le thread de notification appelle sched_yield()
, l’ordonnanceur voit qu’il est seul sur son CPU et le réactive immédiatement ce qui lui permet de reverrouiller le mutex avant que le thread principal ait eu le temps de se réveiller.
Cette solution n’est donc pas satisfaisante. Il nous faudrait un moyen de s’assurer que le thread principal a bien traité toutes les notifications antérieures avant de lui en renvoyer une nouvelle. Pour cela nous pouvons essayer d’utiliser un sémaphore.
Emploi d’un sémaphore
Un sémaphore est un objet de synchronisation doté d’un compteur que l’on peut incrémenter (avec la fonction sem_post()
) ou décrémenter (avec sem_wait()
). Le compteur doit toujours rester positif ou nul. La décrémentation du compteur est une opération potentiellement bloquante tant que le compteur est nul.
Contrairement aux objets de synchronisation comme les mutex, l’incrémentation et la décrémentation d’un sémaphore ne se produisent pas nécessairement dans la même tâche. Il est parfaitement usuel de voir un thread invoquer sem_wait()
pour attendre une certaine circonstance qui lui sera indiquée par un autre thread invoquant sem_post()
.
Notre première approche de l’utilisation d’un sémaphore sera donc en remplacement du sched_yield()
pour attendre que le thread principal ait eu le temps de traiter la notification avant de lui en renvoyer une nouvelle.
Voici un exemple de code.
exemple-condition-04.c #include <pthread.h> #include <semaphore.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <sys/time.h> pthread_cond_t cnd = PTHREAD_COND_INITIALIZER; pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; sem_t sem; void * thread_notification (void * arg) { struct timeval tv; while (1) { sleep(1); pthread_mutex_lock(& mtx); gettimeofday(& tv, NULL); pthread_cond_signal(& cnd); pthread_mutex_unlock(& mtx); sem_wait(& sem); pthread_mutex_lock(& mtx); pthread_cond_signal(& cnd); pthread_mutex_unlock(& mtx); sem_wait(& sem); pthread_mutex_lock(& mtx); pthread_cond_signal(& cnd); pthread_mutex_unlock(& mtx); sem_wait(& sem); fprintf(stderr, "Trois notifications envoyées à: %ld.%06ld\n", tv.tv_sec, tv.tv_usec); } return NULL; } int main (void) { pthread_t thr; struct timeval tv; sem_init(& sem, 0, 0); pthread_mutex_lock(& mtx); if (pthread_create(& thr, NULL, thread_notification, NULL) != 0) exit(EXIT_FAILURE); while (1) { pthread_cond_wait(& cnd, & mtx); gettimeofday(& tv, NULL); sem_post(& sem); fprintf(stderr, "Notification reçue à %ld.%06ld\n", tv.tv_sec, tv.tv_usec); } pthread_mutex_unlock(& mtx); return EXIT_SUCCESS; }
Ainsi, le thread devra, après avoir envoyé une notification, attendre que le thread principal incrémente le sémaphore (initialement nul) pour pouvoir continuer. Vérifions le fonctionnement.
$ ./exemple-condition-04 Notification reçue à 1376605365.617369 Notification reçue à 1376605365.617557 Notification reçue à 1376605365.617594 Trois notifications envoyées à: 1376605365.617332 Notification reçue à 1376605366.617822 Notification reçue à 1376605366.617897 Notification reçue à 1376605366.617988 Trois notifications envoyées à: 1376605366.617791 Notification reçue à 1376605367.618231 Notification reçue à 1376605367.618374 Notification reçue à 1376605367.618429 Trois notifications envoyées à: 1376605367.618200 Notification reçue à 1376605368.618599 Notification reçue à 1376605368.618679 Notification reçue à 1376605368.618763 Trois notifications envoyées à: 1376605368.618572 Notification reçue à 1376605369.618967 Notification reçue à 1376605369.619217 Notification reçue à 1376605369.619293 Trois notifications envoyées à: 1376605369.618938 Notification reçue à 1376605370.619516 Notification reçue à 1376605370.619593 Notification reçue à 1376605370.619682 Trois notifications envoyées à: 1376605370.619487 ^C $
Cette méthode fonctionne très bien dans notre cas. Aussi bien sur un système multicœur que sur un processeur unicœur.
Notifications parallèles
Toutefois, en regardant le code proposé à mon client j’ai été pris d’un doute. Dans l’exemple ci-dessus, un seul thread envoyait des notifications au thread main, mais que se passera-t-il si plusieurs threads essayent d’en envoyer simultanément ?
Supposons que deux threads, appelons-les A et B, entament simultanément la portion de code suivante :
pthread_mutex_lock(& mtx); pthread_cond_signal(& cnd); pthread_mutex_unlock(& amp; mtx); sem_wait(& sem);
Voici ce qui peut se produire :
- Le thread A prend le mutex.
- (Sur un autre CPU) le thread B réclame le mutex. Ce dernier étant déjà verrouillé, B reste bloqué.
- A invoque
pthread_cond_signal()
qui réveille le thread main. Celui-ci tente d’obtenir le mutex (danspthread_cond_wait
), qui est verrouillé. Le thread main se rendort. - A relâche le mutex et se bloque en attente sur le sémaphore. La libération du mutex réveille B et main. L’ordonnanceur choisit… B (par malchance).
- B obtient le mutex, et envoie la seconde notification sur la variable condition sans que le thread main n’ait traité la première.
- B relâche le mutex et se bloque sur le sémaphore
- Le thread principal obtient enfin le mutex, fait un traitement et incrémente une seule fois le sémaphore alors que deux threads sont bloqués en attente dessus.
Non seulement notre programme rate des notifications, mais en outre des threads peuvent se retrouver définitivement bloqués par erreur !
Pour vérifier le comportement, j’ai modifié quelque peu le programme précédent. Nous n’affichons plus les dates de notification, mais la valeur de deux compteurs. L’un est incrémenté à chaque envoi de notification et le second à chaque réception. Le thread principal affiche régulièrement ces deux compteurs. En outre, nous lançons cinq threads en parallèle.
exemple-condition-05.c #include <pthread.h> #include <semaphore.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <sys/time.h> pthread_cond_t cnd = PTHREAD_COND_INITIALIZER; pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; sem_t sem; int compteur_notifications_envoyees = 0; int compteur_notifications_recues = 0; void * thread_notification (void * arg) { while (1) { sleep(1); pthread_mutex_lock(& mtx); pthread_cond_signal(& cnd); compteur_notifications_envoyees ++; pthread_mutex_unlock(& mtx); sem_wait(& sem); pthread_mutex_lock(& mtx); pthread_cond_signal(& cnd); compteur_notifications_envoyees ++; pthread_mutex_unlock(& mtx); sem_wait(& sem); pthread_mutex_lock(& mtx); pthread_cond_signal(& cnd); compteur_notifications_envoyees ++; pthread_mutex_unlock(& mtx); sem_wait(& sem); } return NULL; } #define NB_THREADS 5 int main (void) { pthread_t thr [NB_THREADS]; int i; sem_init(& sem, 0, 0); pthread_mutex_lock(& mtx); for (i = 0; i < NB_THREADS; i ++) if (pthread_create(& (thr[i]), NULL, thread_notification, NULL) != 0) exit(EXIT_FAILURE); while (1) { pthread_cond_wait(& cnd, & mtx); compteur_notifications_recues ++; sem_post(& sem); fprintf(stderr, "Notifications envoyees : %d, recues : %d\n", compteur_notifications_envoyees, compteur_notifications_recues); } pthread_mutex_unlock(& mtx); return EXIT_SUCCESS; }
L’exécution confirme qu’un problème se pose car les compteurs sont très vite décalés.
$ ./exemple-condition-05 Notifications envoyees : 1, recues : 1 Notifications envoyees : 3, recues : 2 Notifications envoyees : 7, recues : 3 Notifications envoyees : 8, recues : 4 Notifications envoyees : 9, recues : 5 Notifications envoyees : 10, recues : 6 Notifications envoyees : 11, recues : 7 Notifications envoyees : 12, recues : 8 Notifications envoyees : 13, recues : 9 Notifications envoyees : 14, recues : 10 Notifications envoyees : 15, recues : 11 Notifications envoyees : 16, recues : 12 Notifications envoyees : 17, recues : 13 Notifications envoyees : 18, recues : 14 Notifications envoyees : 19, recues : 15 Notifications envoyees : 20, recues : 16 Notifications envoyees : 21, recues : 17 Notifications envoyees : 22, recues : 18 Notifications envoyees : 23, recues : 19 Notifications envoyees : 24, recues : 20 ^C $
Dès que les compteurs sont décalés de 4 incrémentations, nous pouvons imaginer que 4 threads sur les 5 sont bloqués sur le sémaphore.
Comment pouvons-nous donc procéder pour être sûr que toutes les notifications envoyés par les threads soient reçues par le thread main ?
Il existe une solution très simple (et très proche du programme ci-dessus). Je vous encourage à y réfléchir quelques instants avant de poursuivre votre lecture.
Solution
Le problème qui se pose, nous l’avons bien vu précédemment est que deux threads peuvent entrer simultanément dans la portion de code
pthread_mutex_lock(& mtx); pthread_cond_signal(& cnd); pthread_mutex_unlock(& mtx);
Pour garantir l’unicité d’exécution d’une portion de programme, nous pouvons justement faire appel à un sémaphore. Au même sémaphore que celui que nous utilisions précédemment.
Il suffit simplement de déplacer l’appel sem_wait()
avant la portion de code ci-dessus. Et symétriquement déplacer sem_post()
avant pthread_cond_wait()
.
Le sémaphore aura pour rôles de garantir
- qu’un seul thread pourra exécuter la fonction de notification à un moment donné
- qu’avant d’exécuter la notification, nous sommes sûrs que le thread principal est en attente
Voici donc le programme correct qui résoud notre problème.
exemple-condition-06.c #include <pthread.h> #include <semaphore.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <sys/time.h> pthread_cond_t cnd = PTHREAD_COND_INITIALIZER; pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; sem_t sem; int compteur_notifications_envoyees = 0; int compteur_notifications_recues = 0; void * thread_notification (void * arg) { while (1) { sleep(1); sem_wait(& sem); pthread_mutex_lock(& mtx); pthread_cond_signal(& cnd); compteur_notifications_envoyees ++; pthread_mutex_unlock(& mtx); sem_wait(& sem); pthread_mutex_lock(& mtx); pthread_cond_signal(& cnd); compteur_notifications_envoyees ++; pthread_mutex_unlock(& mtx); sem_wait(& sem); pthread_mutex_lock(& mtx); pthread_cond_signal(& cnd); compteur_notifications_envoyees ++; pthread_mutex_unlock(& mtx); } return NULL; } #define NB_THREADS 5 int main (void) { pthread_t thr [NB_THREADS]; int i; sem_init(& sem, 0, 0); pthread_mutex_lock(& mtx); for (i = 0; i < NB_THREADS; i ++) if (pthread_create(& (thr[i]), NULL, thread_notification, NULL) != 0) exit(EXIT_FAILURE); while (1) { sem_post(& sem); pthread_cond_wait(& cnd, & mtx); compteur_notifications_recues ++; fprintf(stderr, "Notifications envoyees : %d, recues : %d\n", compteur_notifications_envoyees, compteur_notifications_recues); } pthread_mutex_unlock(& mtx); return EXIT_SUCCESS; }
NB : J’ai placé le sem_wait()
avant le pthread_mutex_lock()
. En effet, j’essaye généralement d’éviter les imbrications de mécanismes de synchronisation, de crainte du fameux dead lock si difficile à débuger. Je pense toutefois que le sem_wait()
pourrait sans problème se trouver entre le pthread_mutex_lock()
et le pthread_cond_signal()
.
Vérifions le fonctionnement :
$ ./exemple-condition-06 Notifications envoyees : 1, recues : 1 Notifications envoyees : 2, recues : 2 Notifications envoyees : 3, recues : 3 Notifications envoyees : 4, recues : 4 Notifications envoyees : 5, recues : 5 Notifications envoyees : 6, recues : 6 Notifications envoyees : 7, recues : 7 Notifications envoyees : 8, recues : 8 Notifications envoyees : 9, recues : 9 Notifications envoyees : 10, recues : 10 Notifications envoyees : 11, recues : 11 Notifications envoyees : 12, recues : 12 Notifications envoyees : 13, recues : 13 Notifications envoyees : 14, recues : 14 Notifications envoyees : 15, recues : 15 Notifications envoyees : 16, recues : 16 Notifications envoyees : 17, recues : 17 Notifications envoyees : 18, recues : 18 Notifications envoyees : 19, recues : 19 Notifications envoyees : 20, recues : 20 Notifications envoyees : 21, recues : 21 Notifications envoyees : 22, recues : 22 Notifications envoyees : 23, recues : 23 Notifications envoyees : 24, recues : 24 Notifications envoyees : 25, recues : 25 Notifications envoyees : 26, recues : 26 Notifications envoyees : 27, recues : 27 Notifications envoyees : 28, recues : 28 Notifications envoyees : 29, recues : 29 Notifications envoyees : 30, recues : 30 Notifications envoyees : 31, recues : 31 Notifications envoyees : 32, recues : 32 Notifications envoyees : 33, recues : 33 Notifications envoyees : 34, recues : 34 Notifications envoyees : 35, recues : 35 Notifications envoyees : 36, recues : 36 Notifications envoyees : 37, recues : 37 Notifications envoyees : 38, recues : 38 Notifications envoyees : 39, recues : 39 Notifications envoyees : 40, recues : 40 Notifications envoyees : 41, recues : 41 Notifications envoyees : 42, recues : 42 Notifications envoyees : 43, recues : 43 Notifications envoyees : 44, recues : 44 Notifications envoyees : 45, recues : 45 Notifications envoyees : 46, recues : 46 Notifications envoyees : 47, recues : 47 Notifications envoyees : 48, recues : 48 Notifications envoyees : 49, recues : 49 Notifications envoyees : 50, recues : 50 Notifications envoyees : 51, recues : 51 Notifications envoyees : 52, recues : 52 Notifications envoyees : 53, recues : 53 Notifications envoyees : 54, recues : 54 Notifications envoyees : 55, recues : 55 Notifications envoyees : 56, recues : 56 Notifications envoyees : 57, recues : 57 Notifications envoyees : 58, recues : 58 Notifications envoyees : 59, recues : 59 Notifications envoyees : 60, recues : 60 Notifications envoyees : 61, recues : 61 Notifications envoyees : 62, recues : 62 Notifications envoyees : 63, recues : 63 Notifications envoyees : 64, recues : 64 Notifications envoyees : 65, recues : 65 Notifications envoyees : 66, recues : 66 Notifications envoyees : 67, recues : 67 Notifications envoyees : 68, recues : 68 Notifications envoyees : 69, recues : 69 Notifications envoyees : 70, recues : 70 Notifications envoyees : 71, recues : 71 Notifications envoyees : 72, recues : 72 Notifications envoyees : 73, recues : 73 Notifications envoyees : 74, recues : 74 Notifications envoyees : 75, recues : 75 ^C $
Conclusion
Nous avons réussi à obtenir un schéma robuste et performant pour traiter dans un thread des notifications asynchrones déclenchées par d’autres threads. On voit que la solution n’était pas si évidente que nous pouvions le penser au début.
Il faut comprendre que l’API des Pthreads, conçue dans les années 1990, souffre de quelques défauts lorsqu’on l’emploie avec des processeurs multicœurs, qui n’apparaissaient pas à l’époque sur des systèmes unicœurs.
Nous pouvons également noter que la solution reposant sur des priorités temps réel ne fonctionne pas bien, et qu’il est parfois nécessaire d’ajouter un objet de synchronisation supplémentaire pour venir à bout des problèmes posés par la concurrence d’accès.
L’ensemble des fichiers sources et Makefile se trouvent dans cette archive.
Remarques, commentaires, etc. sont les bienvenus !
Pourquoi ne pas avoir utilisé une file de message mq_receive/mq_send pour synchroniser vos tâches : la(les) tâche(s) qui lise(nt) la ligne série écrit(vent) dans la file, la tâche qui traite les données les reçoit par la file ?
Oui, ce serait probablement une bonne méthode. Il faudrait vérifier les performances (temps de réveil de la tâche en attente) par rapport aux variables-condition, mais ça ne devrait pas être très différent. J’essayerai ça prochainement.
Pour assouplir le système et permettre à plusieurs tâches de poster des notifications alors que la tâche de réception est déjà occupée, on peut allonger la file de message (qui par défaut ne contient que 10 messages seulement) en utilisant mq_getattr()/mq_setattr().
En outre il est possible de donner une priorité aux messages, donc d’avoir des notifications (très prioritaires) prises en compte avant d’autres (moins prioritaires) postées auparavant.
Notons quand même que le comportement n’est pas tout à fait identique puisque la tâche envoyant une notification n’est pas obligée d’attendre que celle de réception soit en attente.
Oui on peut imaginer que les performances sont les mêmes dans cet exemple où l’on posterais seulement un entier dans la file dans chaque message. Peut-être que l’on peut gagner sur le nombre d’appel systèmes _dans_le_pire_cas_ avec 1 seul appel système mq_send au lieu de quatre mutex_lock/cond_signal/mutex_unlock/sem_wait dans l’exemple.
Dans nos applications, on n’utilise jamais le mécanisme de priorité de messages dans les files car nous trouvons que ça peut compliquer le design de la tâche de réception. S’il y a besoin de deux niveaux de traitement (rapide/moins rapide), on crée deux files et deux tâches de priorité différentes qui reçoivent sur chacune des deux files. La tâche de plus haute priorité traitera les messages qui doivent être traités rapidement.
Pour retrouver le même comportement que dans l’exemple de l’article, c’est à dire qu’une tâche doit attendre que la notification soit lue pour être débloquée, il existe l’appel rt_task_reply dans Xenomai. Ceci dit, je n’aime pas ce type d’appel car l’intérêt des files de message est de découpler totalement le traitement de la notification, qui peuvent donc s’exécuter avec des priorités différentes sur des coeurs différents.
Avec rt_task_reply ou comme dans votre exemple, la durée du traitement peut donc bloquer l’appelant assez longtemps. Il faut donc trouver un moyen de recouvrer ce temps dans la tâche de notification ce qui n’est pas évident si l’on veut garder toutes ses tâches les plus simples possibles.
Cet article semble manquer un élément clef lié au variable condition: le thread qui attend la notification doit tester la véracité de la condition qui a déclanché la notification de la part d’un autre thread.
Il me semble que vous pourriez obtenir ainsi des solutions plus simples/performantes aux problèmes approchés dans cet article ( en outre d’obtenir une solution POSIX conforme ).