Многозадачность и синхронизация в Plan 9: Часть 1

Plan 9 имеет довольно простой механизм управления процессами, здесь нет разделения на потоки и процессы на уровне ОС. Вот пример простейшей программы реализующей многозадачность:
01	#include <u.h>
02	#include <libc.h>
03	
04	void thread(int val)
05	{
06		int i;
07	
08		switch(rfork(RFPROC|RFFDG|RFMEM))
09		{
10			case -1:
11				print("can't fork!");
12				return;
13			case 0:
14				break;
15			default:
16				return;
17		}
18		
19	
20		for(i=0;i<10;i++)
21			print("%d ",i+val);
22		exits(0);
23	}
24
25
26	void main(int,char**)
27	{
28		thread(0);
29		thread(10);
30		exits(0);
31	}
С техникой fork я познакомился изучая Plan 9 поэтому поясню ее и для остальных. Пусть есть определенная часть кода (20-21) которую мы решили выполнять отдельным процессом (даже двумя). Для создания нового процесса мы используем строки 08-17. Основой создания нового процесса является системный вызов rfork. Работа функции rfork сводится к созданию копии процесса (ветвления) родителя (из которого мы ее вызываем), при этом стек процессов разделяется, а память остается общей (если это указано в параметрах флагом RFMEM, иначе память копируется), т.е. все глобальные переменные остаются доступными для обоих потоков. Результат функции возвращается обоим процессам, но разный 0 - для потомка и PID (Process identifier) родителю. Отрицательный результат возвращается родителю в случае невозможности создания нового процесса. Такой метод реализации многозадачности используется во всех Unix-like системах. В нашей программе в случае удачного порождения процесса, родитель получает PID потомка и возвращается по return (16), а процесс-потомок продолжает выполнение (14) со строки 20, где решается поставленная задача, в данном случае выводятся последовательно 10 чисел на консоль. Exits(0) (22) завершает текущий процесс. Для наглядности можно запустить несколько потоков (в нашем случае два) и мы увидим, как их выполнение чередуется.
Параметры, которые передаются функции rfork, позволяют управлять разделением ресурсов между потомком и родителем, они имеют следующие значения:
RFPROCесли установить то будет создан новый процесс, иначе будут изменены параметры вызывающего процесса
RFNOWAITесли установить, то родитель не получит Waitmsg (от потомка) при завершении по exits
RFNAMEGесли установить то потомок получит копию родительского пространства имен, иначе потомок будет использовать общее пространство имен с родителем. Этот флаг исключается RFCNAMEG
RFCNAMEGесли установить то потомок получит чистое пространство имен, которое должен построить посредством mount
RFNOMNTзапретит потомку использование mount
RFENVGесли установить то потомок унаследует копию переменных окружения, иначе будет использовать родительские. Исключается RFCENVG
RFCENVGочищает переменные окружения потомка
RFNOTEGкаждый процесс является членом определенной групы, которая может получить уведомления (notes) через notepg (см. proc(3) ) файл. По умолчанию группа потомка такая же, как и у родителя. Если этот флаг установлен, то процесс отделяется в собственную группу
RFFDGесли установить то потомок получит копию файловых дескрипторов родителя, иначе будет использовать родительские файловые дескрипторы
RFCFDGочищает таблицу файловых дескрипторов потомка
RFRENDесли установлен то процес не сможет rendezvous(2) с предками, но при этом он может rendezvous(2) с потомками. Как результат процесс становится первым в группе разделяющей общее пространство для rendezvous(2)
RFMEMиспользуется только совместно с RFPROC, позволяет потомку использовать сегменты data и bss родителя, иначе используются копии

Процессы в Plan 9 могут иметь 20 уровней приоритетов от 0 до 19, чем выше номер - тем выше приоритет. Также разделяют базовый и рабочий приоритет. Базовый устанавливает пользователь (по умолчанию 10), а система (sheduler, планировщик системы) корректирует его (время от времени) и устанавливает как рабочий. Для управления процессами существует драйвер (файловый сервис) proc (proc(3)). Этот драйвер предоставляет каждый процесс как каталог, имя которого соответствует PID процесса. В таком каталоге находятся файлы, которые необходимы для управления процессом:
ФайлДоступОписание
argsчтение содержит параметры, которые переданы программе (в том числе и имя самого исполняемого файла)
ctlчтение/запись служит для передачи команд управления процессом
fdчтение файловые дескрипторы процесса (первый - текущий каталог)
fpregs чтение/запись регистры FPU (floating point unit)
kregs чтение отображает состояние регистров процессора (уровень ядра)
mem чтение/запись через этот файл можно получить доступ к памяти процесса (исполняемой, данных, стек, стек ядра для этого процесса и.т.д)
note чтение/запись для передачи уведомлений программе (см. notify(2))
noteidчтение/запись для установки групы note (см. rfork, параметр RFNOTEG)
notepgзапись для передачи уведомлений всем программам в групе (см. rfork, параметр RFNOTEG)
ns чтение пространство имен процесса (история выполнения bind и mount, формат соответствует namespace(6))
proc чтение используется для восстановления стека программы отладчиком
profile чтение счетчик частоты выполнения инструкций (см. prof(1))
regs чтение/запись отображает состояние регистров процессора (уровень пользователя)
segmentчтение показывает, какие сегменты подключены к процессу
status чтение содержит данные о состоянии процесса, которые можно описать следующей структурой, записи содержатся в текстовом виде и разделены пробелом (после чтения из status ни одна строка не является null-terminated поэтому необходимо действие типа Name[27]=0 перед обработкой ее функциями типа strcpy, atoi и т.д.)
struct ProcState
{
	char Name[28];		//Имя процесса
	char User[28];		//Имя владельца процесса
	char State[12];		//Состояние процесса
	char CPUtime[6][12];	//копия файла /dev/cputime
	char Pages[12];		//количество занимаемых страниц (1024 байт) памяти
	char NomPri[12];		//базовый приоритет (номинальный)
	char CurPri[12];		//реальный приоритет (в данный момент времени)
};
text чтение копия исполняемого файла, используется отладчиком для доступа к отладочной информации
wait чтение служит для получения сообщение от потомков (о завершении выполнения) (см. wait(2))

Вот примеры функций которые управляют процессом:
int SetPri(int priority)
{
	char buf[256];
	int fd,pid;

	if (!(pid=getpid()))
		return 0;

	if (!sprint(buf,"/proc/%d/ctl",pid))
		return 0;

	if ((fd=open(buf,OWRITE))<0)
		return 0;

	fprint(fd,"pri %d\n",priority);

	close(fd);
	return 1;
}
Эта функция изменяет приоритет процесса, который ее вызвал. Первым делом определяется идентификатор процесса (PID) и вычисляется соответствующий путь к файлу управления, после чего записываем команду pri n (proc(3)), где n - приоритет, и закрываем файл. Процесс получит новый приоритет (если он в пределах от 0 до 19). Для того чтобы узнать текущий приоритет можно использовать следующую функцию:
int GetPri(void)
{
	char buf[256];
	ProcState procstate;
	int fd,pid;

	if (!(pid=getpid()))
		return 0;

	if (!sprint(buf,"/proc/%d/status",pid))
		return 0;

	if ((fd=open(buf,OREAD))<0)
		return 0;

	if (read(fd,&procstate,sizeof(pst))!=sizeof(pst))
		return 0;

	close(fd);
	
	procstate.NomPri[27]=0;
	return atoi(procstate.NomPri);
}
Она отличается от предыдущей, тем что открывает на чтение файл status. После чтения содержимого файла в структуру procstate, текстовый результат переводится в число и возвращается. Используя файл управления ctl можно останавливать и перезапускать процессы, кроме того через этот файл можно получить доступ к real-time механизму Plan 9 (см. proc(3)).

Немного о синхронизации

Одним из способов синхронизации выполнения процессов в Plan 9 является использование rendezvous points - точек встреч. Для начала ожидания один из процессов вызывает rendezvous, указав условленное значение tag, второй процесс "после достижения кондиции" делает тот же вызов, с тем же значением tag. Нет разницы какой из процессов придет "на встречу" первым, после "встречи" каждый получит значение value (тип ulong) от своего "собеседника" и продолжит выполнение. Rendezvous является системным вызовом (см. /sys/src/libc/9syscall/sys.h) ядра Plan 9. Процесс, который пришел на встречу первым переходит в состояние Rendez и находится в нем до "встречи" с другим процессом.
ulong rendezvous(ulong tag, ulong value)
Для успешной "встречи" процессов они должны использовать общее значение rendezvous tag и быть в одной группе (см. RFEND, не путать с группой процессов разделяющих notepg!). В случае "обрыва встречи" rendezvous возвращает ~0. Следующий пример иллюстрирует такую технику. Пусть нам необходимо выполнить сложное вычисление, и мы решаем разбить расчет на части, которые выполним параллельно (так как FPU у нас один, а функции используют именно его, то производительность у нас не повысится, для повышения производительности нужно всё это запустить на 2-х процессорной машине, и при этом развести процессы на разные CPU). Пусть наша функция имеет вид y=sin(x)/cos(x), и соответственно числитель и знаменатель мы будем рассчитывать параллельно.
01	#include <u.h>
02	#include <libc.h>
03	
04	double x;
05	double sinx;
06	
07	void sinthread(void)
08	{
09		for(;;)
10		{
11			switch(rendezvous(1,0))
12			{
13			case -1:
14				print("rendezvous broken\n");
15				exits(0);
16			case 1:
17				exits(0);
18			default:
19				sinx=sin(x);
20				rendezvous(1,0);
21				break;
22			}
23		}
24	}
25	
26	void main(int,char**)
27	{
28		double cosx,y;
29	
30		rfork(RFREND);
31	
32		switch(rfork(RFPROC|RFMEM))
33		{
34			case 0:
35				sinthread();
36				exits(0);
37			case -1:
38				print("can't fork");
39				exits(0);
40		}
41	
42		for (x=0.5;x<10;x+=0.5)
43		{
44			rendezvous(1,0);
45			cosx=cos(x);
46			rendezvous(1,0);
47			y=sinx/cosx;
48			print("x=%f y=%f\n",x,y);
49		}
50	
51		rendezvous(1,1);
52	
53		exits(0);
54	}
В нашем примере можно выделить две основные точки, где необходимо участие "двух сторон" - передача аргумента и передача результата, соответственно и точек встречи будет две (44 и 46). Первая, синхронизирует прием аргумента (44 и 11) (глобальная переменная x), после чего начинается расчет, или принимается решение о прекращении деятельности процесса (16 и 13) . Выполнив первый rendezvous (11) процесс начинает ожидать пока родитель не установит значение аргумента, после чего производит расчет функции (19) и выполняет второй rendezvous для того чтобы встретив (46) родителя сообщить о законченном расчете. Родитель, зная что переменная sinx содержит верное значение, делает окончательный расчет и выводит результат. Закончив цикл, родитель делает последнюю встречу (51), при этом сообщая (передавая значение 1) что программа закончена, тем самым заставляя потомок закончить работу.
Кроме того имеется еще несколько примитивов синхронизации: lock, qlock и rwlock. Рассмотрим их по порядку.
Lock представлен следующим набором функций:
void lock(Lock *l)
int  canlock(Lock *l)
void unlock(Lock *l)
Функция lock приостанавливает действие программы (зацикливается) на время пока не сможет захватить объект Lock, если блокировка не допустима, то можно использовать функцию canlock которая возвращает 0 при невозможности захватить объект Lock или ненулевое значение в случае успеха (объект уже будет захвачен). Функция unlock разблокирует объект Lock. Когда происходит вызов функции lock программа посредством вызова tas (test-and-set) функции пытается захватить объект Lock, при этом после каждой неудачной попытки вызывается sleep(0), после 1000 неудач между попытками вставляется пауза в 100 мс, и еще после 1000 неудач - пауза в 1 сек, и попытки не прекращаются до захвата объекта Lock. Это сделано для предотвращения пустой загрузки процессора в случае длительного захвата Lock, если же такая ситуация возможна то следует использовать qlock.
QLock имеет интерфейс подобный Lock:
void qlock(QLock *l)
int  canqlock(QLock *l)
void qunlock(QLock *l)
Отличие состоит в том что процесс который пытается захватить QLock, приостанавливается (посредством Rendezvous) и ставится в очередь, ожидая того момента, когда будет возможна блокировка объекта QLock. Когда процесс вызывает qlock, и объект QLock при этом находится в "занятом" состоянии то процесс добавляет себя в "очередь" процессов ожидающий захвата объекта и вызывает Rendezvous. Когда процесс "владеющий" объектом вызывает qunlock и видит в очереди процесс, ожидающий освобождения объекта, то он Rendezvous с ним и передает ему объект, оба процесса продолжают выполнение.
RWLock представляет объект для реализации задачи "о читателях и писателях". Имеет двойной набор функций, для доступа к чтению и для доступа к записи. Чтени, возможно когда никто не заблокировал объект для записи, но при любом количестве читающих. Запись возможна только когда объект покинут все читатели и писатели.
Для чтения:
void rlock(RWLock *l)
int  canrlock(RWLock *l)
void runlock(RWLock *l)
Для записи:
void wlock(RWLock *l)
int  canwlock(RWLock *l)
void wunlock(RWLock *l)
RWLock работает на основе QLock. Для инициализации объектов типа Lock, QLock и RWLock необходимо заполнить их нулями.
Кроме того существует расширенная версия Rendezvous points, она поддерживает "встречу" нескольких (более 2-х) процессов.
void rsleep(Rendez *r)
int  rwakeup(Rendez *r)
int  rwakeupall(Rendez *r)
Если процесс требует определенных условий для продолжения он может выполнить rsleep для входа в режим ожидания. После достижения такого условия вызов rwakeup разбудит один из процессов ожидающих на Rendez, как результат эта функция возвратит 1 если процесс "очнулся" или 0 если нет. Rwakeupall пробудит все процессы "спящие" на этой точке (Rendez), и возвратит количество пробужденных процессов. Перед использованием Rendez необходимо указатель l (ell) в структуре Rendez установить на существующий проинициализированный QLock, который будет защищать структуру Rendez, необходимо отметить что с этим объектом (QLock) будут работать как функции которые вызывают rsleep и rwakeup(all) так и они (rsleep и rwakeup(all)) сами. Вот код иллюстрирующий это:
01	#include <u.h>
02	#include <libc.h>
03	#include <thread.h>
04	
05	QLock q;
06	Rendez r={.l=&q};
07	
08	int x,d;
09	Ref cnt={.ref=3};
10	
11	void thread(int n)
12	{
13		int i;
14	
15		switch(rfork(RFPROC|RFMEM))
16		{
17			case -1:
18				print("can't fork");
19				exits(0);
20			case 0:
21				break;
22			default:
23				return;
24		}
25	
26		for(i=0;i<10+n;)
27		{
28			qlock(r.l);
29			x=n;
30			d=i;
31			i+=rwakeup(&r);
32			qunlock(r.l);
33		}
34	
35		decref(&cnt);
36		exits(0);
37	}
38	
39	void main(int,char**)
40	{
41		thread(0);
42		thread(1);
43		thread(2);
44	
45		for(;cnt.ref;)
46		{
47			qlock(r.l);
48			rsleep(&r);
49			print("Receive %d from %d\n",d,x);
50			qunlock(r.l);
51		}
52	
53		exits(0);
54	}
Основной процесс создает три потомка (41-43), а сам засыпает (47-48) в ожидании сообщений от них. Дочерние процессы подготавливают некоторые данные (29-30) и пробуждают (31) основной процесс для их обработки (49). В нашем случае цикл "продвигается" (31) вперед только в случае удачного пробуждения потока-обработчика. На данный момент я не могу дать более точного описания работы функций связанных с Rendez. В примере также используются функции (точнее одна из них) для атомарных операций инкремента и декремента:
void incref(Ref*)
long decref(Ref*)
Они оперируют структурой Ref:
typedef struct Ref {
	long ref;
} Ref;
Функция decref возвращает соответствующее значение которое находится в счетчике. Эти функции являются частью библиотеки libthread которая имеет расширенные средства IPC, в виде каналов (Channel), и средства для создания процессов и нитей.
Хостинг от uCoz