Programowanie współbieżne

W systemie działa serwer i pewna liczba procesów. Każdy proces jest albo ugodowy albo neutralny albo marudny. Typ procesu podaje się przy jego uruchomieniu w linii poleceń: proces u, proces n lub proces m

Proces w pętli nieskończonej:

wykonuje własne sprawy, czyli po prostu usypia na losowy czas --- od 1 do 10 sekund,
następnie wypisuje na standardowy strumień diagnostyczny komunikat "Proces %d, czekam na zasób.n", zgłasza się do serwera i czeka, aż serwer przydzieli mu zasoby,
otrzymuje od serwera informacje o typie przydzielonego zasobu,
korzysta z zasobu --- wypisuje na standardowy strumień diagnostyczny komunikat o typie otrzymanego zasobu: "Proces %d, otrzymalem zasób %d.n" i pracuje z zasobem, czyli usypia na losowy czas --- od 1 do 10 sekund
wypisuje na standardowy strumień diagnostyczny komunikat "Proces %d, skonczylemn" i oddaje zasób serwerowi.
Wszystkie komunikaty są w formacie stosowanym w fprintf i mają zawierać PID procesu. Poza tym procesy nie wypisują żadnych innych komunikatów.
Zarządzaniem procesami i zasobami zajmuje się serwer. Zarządza on zasobami dwóch typów: lepszym i gorszym. Jest on wywoływany z dwoma parametrami: N - oznaczającym liczbę dostępnych zasobów lepszych, M - oznaczającym liczbę dostępnych zasobów gorszych. Serwer komunikuje się z procesami za pomocą kolejki (lub kolejek) komunikatów. W pierwszej kolejności odbiera on komunikaty o zwrocie zasobów, które obsługuje sam. W następnej kolejności odbiera komunikaty z prośbą o przydzielenie zasobu. Po odbiorze takiego komunikatu serwer tworzy wątek odpowiedzialny za obsługę tego żądania i ponownie oczekuje na zlecenia od procesów.

Wątek odpowiedzialny za obsługę żądań działa w następujący sposób:

jeżeli żądanie pochodzi od procesu marudnego, wątek oczekuje aż będzie dostępny przynajmniej jeden egzemplarz zasobu lepszego, po czym wysyła do procesu komunikat z informacją o przydzieleniu mu zasobu i kończy się,
jeżeli żądanie pochodzi od procesu neutralnego, wątek oczekuje aż będzie dostępny przynajmniej jeden egzemplarz zasobu dowolnego typu (preferując jednak o ile to możliwe zasoby lepsze), po czym wysyła do procesu komunikat z informacją o typie przydzielonego zasobu i kończy się,
jeżeli żądanie pochodzi od procesu ugodowego, wątek oczekuje aż będzie dostępny przynajmniej jeden egzemplarz zasobu dowolnego typu (bez żadnych preferencji), po czym wysyła do procesu komunikat z informacją o typie przydzielonego zasobu i kończy się.
Zaimplementuj opisany powyżej schemat. Rozwiązanie powinno:

zawierać co najmniej dwa pliki wykonywalne: serwer oraz proces,
wykorzystywać do komunikacji między serwerem a procesami kolejki komunikatów IPC,
wykorzystywać do synchronizacji wątków muteksy i zmienne warunkowe,
dbać o to, aby przerwanie działania serwera za pomocą CTRl+C nie zostawiało po sobie żadnych niezwolnionych zasobów IPC,
zawierać prostą obsługę błędów - wystarczy, że proces wykryjący błąd zakończy się.
Wszystkie pliki źródłowe oraz Makefile należy spakować do jednego pliku i wysłać go jako załącznik na adres mengel@mimuw.edu.pl do dnia 14 czerwca 2010, godz. 23:59. Każdy rozpoczęty tydzień spóźnienia skutkuje ,,premią'' w wysokości -2pkt.

A oto rozwiązanie:

[adsense]

Makefile:


all: serwer proces
serwer: serwer.cpp
	g++ serwer.cpp -o serwer -lpthread
proces: proces.cpp
	g++ proces.cpp -o proces -lpthread

proces.cpp :


#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
using namespace std;

    
struct wiadomo {
    long mtype; 
	int wiadomosc;
};


int main(int argc, char* argv[])
{
	
	//Tworzymy kolejke:
	key_t key = ftok("./", 1);
    int msqid = msgget(key, 0666 | IPC_CREAT);
	srand ( time(NULL) );
	pid_t pid=getpid();
	
	while(1)
	{
		sleep(rand()%10+1);
		fprintf(stderr,"Proces %d, czekam na grupe.n",pid);
		//cerr<<"Proces "<<pid<<", czekam na grupe.n";
		struct wiadomo wiadomosc={1,10};
		//Pukamy do drzwi serwera:
		msgsnd(msqid, &wiadomosc, sizeof(wiadomosc), 0);
		struct wiadomo wiadomosc2;
		//Czekamy na odpowiedz z numerem grupy
		msgrcv(msqid, &wiadomosc2, sizeof(wiadomosc), 2, 0);
		int nr_grupy=wiadomosc2.wiadomosc;
		//cout<<"Przydzielono do grupy: "<<nr_grupy<<"n";
		struct wiadomo wiadomosc3;
		//A moze jakies zasoby?
		msgrcv(msqid, &wiadomosc3, sizeof(wiadomosc), 3+2*nr_grupy, 0);
		int zasobow=wiadomosc3.wiadomosc;
		//cout<<"Przydzielono zasobow: "<<zasobow<<"n";
		
		//No i mamy zasoby. Teraz chcemy sie zesynchronizowac!
		int key2 = ftok("./", 2+nr_grupy);
		int semid = semget(key2,1, 0666 );
		static struct sembuf buf;
		buf.sem_num=0;
		buf.sem_op=-1; //Hapsamy semafor
		buf.sem_flg=0;
		
		semop(semid,&buf,1);
		//cout<<"Jestesmy w sekcji krytycznej. Czekamy "<<zasobow<<" sekund. n";
		//cerr<<"Proces "<<pid<<", otrzymalem "<<zasobow<<" zasobow.n";
		fprintf(stderr,"Proces %d, otrzymalem %d zasobow.n",pid,zasobow);
		
		//Robimy swoje:
		sleep(zasobow);
		
		//cout<<"Opuszczamy. "<<zasobow<<"n";
		buf.sem_op=1;
		semop(semid,&buf,1);
		
		struct wiadomo wiadomosc4;
		wiadomosc4.mtype=4+2*nr_grupy;
		wiadomosc4.wiadomosc=pid;

		//cout<<"["<<wiadomosc4.mtype<<"]n";
		
		msgsnd(msqid, &wiadomosc4, sizeof(wiadomosc4), 0);  //skonczylismy. Na razie! 
		//cerr<<"Proces "<<pid<<", skonczylemn";
		fprintf(stderr,"Proces %d, skonczylemn",pid);
	}
		
	return 0;
}


serwers.cpp :


#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

using namespace std;


//zmienne
key_t key;
int msqid;
int nr_grupa=1;
int start_zasobow=3;
int zasoby=0;//char2int(argv[1]);
int grupy=3;//char2int(argv[2]);
int watki=2;//char2int(argv[3]);
int potrzeba_zasobow=0;
pthread_mutex_t mut,mut2; 
pthread_cond_t cond,cond_zas1,cond_zas2;
int pytan=0;


struct wiadomo {
    long mtype; 
	int wiadomosc;
};


void* watek(void *argument)
{
	
	
	while(1)
	{
		//Bedziemy czekac na grupe:
		pthread_mutex_lock(&mut);
		if(pytan<grupy)
		{
			pthread_cond_wait(&cond, &mut);
		}
		pytan=pytan-grupy;
		int numer=nr_grupa;
		nr_grupa++;
		pthread_mutex_unlock(&mut);// mutex sluzy nam glownie do czytania numeru.
		//Grupa jest. Do akcji!
		
		//Akceptacja polaczenia z informacja do ktorej grupy przydzial:
		struct wiadomo wiadomosc;
		wiadomosc.mtype=2;
		wiadomosc.wiadomosc=numer;
		for(int i=0;i<grupy;i++)
		{
			msgsnd(msqid, &wiadomosc, sizeof(wiadomosc), 0);
		}
		
		int potrzeba=rand()%(start_zasobow-1)+1;//ile chcemy zasobow?
		//cerr<<"Grupa "<<numer<<" oczekuje na "<<potrzeba<<" zasobow.n";
		fprintf(stderr,"Grupa %d oczekuje na %d zasobow.n",numer,potrzeba);
		//cout<<zasoby<0)
		{
		//cout<<zasoby<<"n";
			pthread_cond_wait(&cond_zas1, &mut2);
		}
		//wpuscili nas do korytarza czekania. mowimy ile potrzebujemy zasobow.
		potrzeba_zasobow=potrzeba;
		while(zasoby<potrzeba_zasobow)
		{
		//cout<<zasoby<<"n";
			pthread_cond_wait(&cond_zas2, &mut2);
		}
		//wpuszczono nas i dano tyle zasobow ile potrzebujemy.
		zasoby=zasoby-potrzeba_zasobow;
		potrzeba_zasobow=0;
		//Wpuszczamy do korytarza czekania.
		pthread_cond_signal(&cond_zas1); 
		pthread_mutex_unlock(&mut2);
		//koniec czekania na zasoby
		
		//cerr<<"Grupa "<<numer<<" otrzymuje "<<potrzeba<<" zasobow.n";
		fprintf(stderr,"Grupa %d otrzymuje %d zasobow.n", numer,potrzeba);
		//informujemy o uzbieranych zasobach.
		//struct wiadomo wiadomosc;
		wiadomosc.mtype=3+2*numer;
		wiadomosc.wiadomosc=potrzeba;
		
		//Tworzymy semaforki
		int key2 = ftok("./", 2+numer);
		int semid = semget(key2,1, 0666| IPC_CREAT);
		semctl(semid,0,SETVAL,1);
		//Utworzone.
			
		for(int i=0;i<grupy;i++)
		{
			msgsnd(msqid, &wiadomosc, sizeof(wiadomosc), 0); 
		}
		//Wyslalismy zasoby. Teraz zbieramy odpowiedzi.
		//Teraz beda porzetwarzac
		int zrodlo=4+2*numer;
		//cout<<"["<<zrodlo<<"]n";
		for(int i=0;i0)&&(zasoby>=potrzeba_zasobow))
		{
			pthread_cond_signal(&cond_zas2); 
			//ktos czeka na zasoby. wpuszczamy
		}
		pthread_mutex_unlock(&mut2);
		fprintf(stderr,"Koniec obslugi grupy %d.n",numer);
		//cerr<<"Koniec obslugi grupy "<<numer<<".n"; 
	}
}




int char2int(char* dane)
{
	int wynik=0;

	for(int i=0;i<strlen(dane);i++)
	{
		wynik=wynik*10+(int)(dane[i]-48);
	}
	return wynik;
}
 
 
void koncz(int sig) 
{
	//Czyszczenie struktur danych.
	//Mutexy i inne takie
	pthread_mutex_destroy(&mut);
	pthread_mutex_destroy(&mut2);
	pthread_cond_destroy(&cond);
	pthread_cond_destroy(&cond_zas1);
	pthread_cond_destroy(&cond_zas2);
	//Kolejki
	msgctl(msqid, IPC_RMID, NULL);
	exit(0);
}
 


	

int main(int argc, char* argv[])
{
	
	if(argc==4)
	{
		start_zasobow=char2int(argv[1]);
		grupy=char2int(argv[2]);
		watki=char2int(argv[3]);
		
		srand ( time(NULL) );
		(void) signal(SIGINT, koncz);//lapanie ctrl-c
		zasoby=start_zasobow;
		//Tworzenie kolejki 'glownej:'
		key = ftok("./", 1);
		msqid = msgget(key, 0666 | IPC_CREAT);
		//Zmienna warunkowa - kolejka procesow.
		pthread_cond_init(&cond, 0);
		//Zmienna warunkowa - tlum czekajacy na zasoby.
		pthread_cond_init(&cond_zas1, 0);
		//Zmienna warunkowa - pokoj czekania.
		pthread_cond_init(&cond_zas2, 0);
		//Mutex czekania na grupe
		pthread_mutex_init(&mut, 0);
		//Mutex czekania na zasoby
		pthread_mutex_init(&mut2, 0);
		//Tworzymy watki:
		
		for(int i=0;i=grupy)
			{
				//Nowa grupa!
				pthread_cond_signal(&cond);
			}
			pthread_mutex_unlock(&mut);
		}
	}
	else
	{
		exit(1);
	}
	
	return 0;
}


Wszystko w paczce:

Program współbieżny

Leave a Reply