UNIX: взаимодействие процессов — страница 24 из 35

Для измерения времени, уходящего на синхронизацию при использовании различных средств, мы создаем некоторое количество потоков (от одного до пяти, согласно табл. А.4 и А.5), каждый из которых увеличивает счетчик в разделяемой памяти большое количество раз, используя различные формы синхронизации для получения доступа к счетчику.

Взаимные исключения Posix

В листинге А.19 приведены глобальные переменные и функция main пpoгрaммы, измеряющей быстродействие взаимных исключений Posix.

Листинг А.19. Глобальные переменные и функция main для взаимных исключений Posix

//bench/incr_pxmutex1.с

1  #include "unpipc.h"

2  #define MAXNTHREADS 100


3  int nloop;

4  struct {

5   pthread_mutex_t mutex;

6   long counter;

7  } shared = {

8   PTHREAD_MUTEX_INITIALIZER

9  };

10 void *incr(void *);


11 int

12 main(int argc, char **argv)

13 {

14  int i, nthreads;

15  pthread_t tid[MAXNTHREADS];

16  if (argc != 3)

17   err_quit("usage: incr_pxmutex1 <#loops><#threads>");

18  nloop = atoi(argv[1]);

19  nthreads = min(atoi(argv[2]), MAXNTHREADS);

20  /* блокировка взаимного исключения */

21  Pthread_mutex_lock(&shared.mutex);

22  /* создание потоков */

23  Set_concurrency(nthreads);

24  for (i = 0; i < nthreads; i++) {

25   Pthread_create(&tid[i], NULL, incr, NULL);

26  }

27  /* запуск таймера и разблокирование взаимного исключения */

28  Start_time();

29  Pthread_mutex_unlock(&shared.mutex);

30  /* ожидание завершения работы потоков */

31  for (i = 0; i < nthreads; i++) {

32   Pthread_join(tid[i], NULL);

33  }

34  printf("microseconds: %.0f usec\n", Stop_time());

35  if (shared.counter != nloop * nthreads)

36   printf("error: counter = %ld\n", shared, counter);

37  exit(0);

38 }

Общие данные

4-9 Совместно используемые потоками данные состоят из взаимного исключения и счетчика. Взаимное исключение инициализируется статически.

Блокирование взаимного исключения и создание потоков

20-26 Основной поток блокирует взаимное исключение перед созданием прочих потоков, чтобы ни один из них не получил это исключение до тех пор, пока все они не будут созданы. Вызывается функция set_concurrency, создаются потоки. Каждый поток выполняет функцию incr, текст которой будет приведен позже. 

Запуск таймера и разблокирование взаимного исключения

27-36 После создания всех потоков главный поток запускает таймер и освобождает взаимное исключение. Затем он ожидает завершения всех потоков, после чего останавливает таймер и выводит полное время работы. В листинге А.20 приведен текст функции incr, выполняемой каждым из потоков.

Листинг А.20. Функция incr, выполняемая потоками

//bench/incr_pxmutex1.c

39 void *

40 incr(void *arg)

41 {

42  int i;

43  for (i = 0; i < nloop; i++) {

44   Pthread_mutex_lock(&shared.mutex);

45   shared.counter++;

46   Pthread_mutex_unlock(&shared.mutex);

47  }

48  return(NULL);

49 }

Увеличение счетчика — критическая область кода

44-46 Операция увеличения счетчика осуществляется после получения блокировки на взаимное исключение. После этого взаимное исключение разблокируется.

Блокировки чтения-записи

Пpoгрaммa, использующая блокировки чтения-записи, является слегка измененной версией программы с взаимными исключениями Posix. Поток должен установить блокировку файла, прежде чем увеличивать общий счетчик.

ПРИМЕЧАНИЕ

Существует не так уж много систем, в которых реализованы блокировки чтения-записи, являющиеся частью стандарта Unix 98 и разрабатываемые рабочей группой Posix.1j. Измерения в этом разделе проводились в системе Solaris 2.6 с использованием блокировок, описанных в документации на странице rwlock(3T). Эта реализация обеспечивает тот же набор функций, что и предлагаемые блокировки чтения-записи Posix. Для использования этих функций мы применяем тривиальные функции-обертки.

В Digital Unix 4.0B мы использовали блокировки чтения-записи поточно-независимых служб, описанные на странице документации tis_rwlock. Мы не приводим листингов с несущественными изменениями, необходимыми для использования этих блокировок.

В листинге А.21 приведен текст функции main, а в листинге А.22 — текст функции incr.

Листинг А.21. Функция main для блокировок чтения-записи

//bench/incr_rwlock1.c

1  #include "unpipc.h"

2  #include  /* Заголовочный файл для Solaris */


3  void Rw_wrlock(rwlock_t *rwptr);

4  void Rw_unlock(rwlock_t *rwptr);

5  #define MAXNTHREADS 100

6  int nloop;

7  struct {

8   rwlock_t rwlock; /* тип данных Solaris */

9   long counter;

10  } shared; /* инициализация О –> USYNC_THREAD */

11  void *incr(void *);


12 int

13 main(int argc, char **argv)

14 {

15  int i, nthreads;

16  pthread_t tid[MAXNTHREADS];

17  if (argc != 3)

18   err_quit("usage: incr_rwlockl <#loops><#threads>");

19  nloop = atoi(argv[1]);

20  nthreads = min(atoi(argv[2]), MAXNTHREADS);

21  /* получение блокировки на запись */

22  Rw_wrlock(&shared.rwlock);

23  /* создание всех потоков */

24  Set_concurrency(nthreads);

25  for (i = 0; i < nthreads; i++) {

26   Pthread_create(&tid[i], NULL, incr, NULL);

27  }

28  /* запуск таймера и снятие блокировки */

29  Start_time();

30  Rw_unlock(&shared.rwlock);

31  /* ожидание завершения всех потоков */

32  for (i = 0; i < nthreads; i++) {

33   Pthread_join(tid[i], NULL);

34  }

35  printf("microseconds: %.0f usec\n", Stop_time());

36  if (shared.counter != nloop * nthreads)

37   printf("error: counter = %ld\n", shared.counter);

38  exit(0);

39 }

Листинг А.22. Увеличение общего счетчика с использованием блокировок чтения-записи

//bench/incr_rwlock1.c

40 void *

41 incr(void *arg)

42 {

43  int i;

44  for (i = 0; i < nloop; i++) {

45   Rw_wrlock(&shared.rwlock);

46   shared.counter++;

47   Rw_unlock(&shared.rwlock);

48  }

49  return(NULL);

50 }

Семафоры Posix, размещаемые в памяти

Мы измеряем скорость работы семафоров Posix (именованных и размещаемых в памяти). В листинге А.24 приведен текст функции main, а в листинге А.23 — текст функции incr.

Листинг А.23. Увеличение счетчика с использованием семафоров Posix в памяти

//bench/incr_pxsem1.с

37 void *

38 incr(void *arg)

39 {

40  int i;

41  for (i = 0; i < nloop; i++) {

42   Sem_wait(&shared.mutex);

43   shared.counter++;

44   Sem_post(&shared.mutex);

45  }

46  return(NULL);

47 }

Листинг А.24. Функция main для семафоров Posix, размещаемых в памяти

//bench/incr_pxsem1.с

1  #include "unpipc.h"

2  #define MAXNTHREADS 100

3  int nloop;

4  struct {

5   sem_t mutex; /* размещаемый в памяти семафор */

6   long counter;

7  } shared;

8  void *incr(void *);


9  int

10 main(int argc, char **argv)

11 {

12  int i, nthreads;

13  pthread_t tid[MAXNTHREADS];

14  if (argc != 3)

15   err_quit("usage: incr_pxseml <#loops><#threads>");

16  nloop = atoi(argv[1]);

17  nthreads = min(atoi(argv[2]), MAXNTHREADS);

18  /* инициализация размещаемого в памяти семафора 0 */

19  Sem_init(&shared.mutex, 0, 0);

20  /* создание всех потоков */

21  Set_concurrency(nthreads);

22  for (i = 0; i < nthreads; i++) {

23   Pthread_create(&tid[i], NULL, incr, NULL);

24  }

25  /* запуск таймера и разблокирование семафора */

26  Start_time();

27  Sem_post(&shared.mutex);

28  /* ожидание завершения всех потоков */

29  for (i = 0; i < nthreads; i++) {

30   Pthread_join(tid[i], NULL);

31  }

32  printf("microseconds: %.0f usec\n", Stop_time());

33  if (shared.counter != nloop * nthreads)

34   printf("error: counter = %ld\n", shared.counter);

35  exit(0);

36 }

18-19 Создается семафор, инициализируемый значением 0. Второй аргумент в вызове sem_init, имеющий значение 0, говорит о том, что семафор используется только потоками вызвавшего процесса.

20-27 После создания всех потоков запускается таймер и вызывается функция sem_post.

Именованные семафоры Posix

В листинге А.26 приведен текст функции main, измеряющей быстродействие именованных семафоров Posix, а в листинге А.25 — соответствующая функция incr.

Листинг А.25. Увеличение общего счетчика с использованием именованного семафора Posix

//bench/incr_pxsem2.c

40 void *

41 incr(void *arg)

42 {

43  int i;

44  for (i = 0; i < nloop; i++) {

45   Sem_wait(shared.mutex);

46   shared.counter++;

47   Sem_post(shared.mutex);

48  }

49  return(NULL);

50 }

Листинг А.26. Функция main для измерения быстродействия именованных семафоров Posix

//bench/incr_pxsem2.с

1  #include "unpipc.h"

2  #define MAXNTHREADS 100

3  #define NAME "incr_pxsem2"


4  int nloop;

5  struct {

6   sem_t *mutex; /* указатель на именованный семафор */

7   long counter;

8  } shared;

9  void *incr(void *);


10 int

11 main(int argc, char **argv)

12 {

13  int i, nthreads;

14  pthread_t tid[MAXNTHREADS];

15  if (argc != 3)

16   err_quit("usage: incr_pxsem2 <#loops><#threads>");

17  nloop = atoi(argv[1]);

18  nthreads = min(atoi(argv[2]), MAXNTHREADS);

19  /* инициализация именованного семафора 0 */

20  sem_unlink(Px_ipc_name(NAME)); /* ошибка – OK */

21  shared.mutex = Sem_open(Px_ipc_name(NAME), O_CREAT | O_EXCL, FILE_MODE, 0);

22  /* создание всех потоков */

23  Set_concurrency(nthreads);

24  for (i = 0; i < nthreads; i++) {

25   Pthread_create(&tid[i], NULL, incr, NULL);

26  }

27  /* запуск таймера и разблокирование семафора */

28  Start_time();

29  Sem_post(shared.mutex);

30  /* ожидание завершения всех потоков */

31  for (i = 0; i < nthreads; i++) {

32   Pthread_join(tid[i], NULL);

33  }

34  printf("microseconds: %.0f usec\n", Stop_time());

35  if (shared.counter != nloop * nthreads)

36   printf("error: counter = %ld\n", shared.counter);

37  Sem_unlink(Px_ipc_name(NAME));

38  exit(0);

39 }

Семафоры System V

Функция main программы, измеряющей быстродействие семафоров System V, приведена в листинге А.27, а функция incr показана в листинге А.28.

Листинг А.27. Функция main для измерения быстродействия семафоров System V

//bench/incr_svsem1.c

1  #include "unpipc.h"

2  #define MAXNTHREADS 100


3  int nloop;

4  struct {

5   int semid;

6   long counter;

7  } shared;

8  struct sembuf postop, waitop;

9  void *incr(void *);


10 int

11 main(int argc, char **argv)

12 {

13  int i, nthreads;

14  pthread_t tid[MAXNTHREADS];

15  union semun arg;

16  if (argc != 3)

17   err_quit("usage: incr_svseml <#loops><#threads>");

18  nloop = atoi(argv[1]);

19  nthreads = min(atoi(argv[2]), MAXNTHREADS);

20  /* создание семафора и инициализация его значением 0 */

21  shared.semid = Semget(IPC_PRIVATE, 1, IPC_CREAT | SVSEM_MODE);

22  arg.val =0;

23  Semctl(shared.semid, 0, SETVAL, arg);

24  postop.sem_num = 0; /* инициализация двух структур semop */

25  postop.sem_op = 1;

26  postop.sem_flg = 0;

27  waitop.sem_num = 0;

28  waitop.sem_op = –1;

29  waitop.sem_flg = 0;

30  /* создание всех потоков */

31  Set_concurrency(nthreads);

32  for (i = 0; i < nthreads; i++) {

33   Pthread_create(&tid[i], NULL, incr, NULL);

34  }

35  /* запуск таймера и разблокирование семафора */

36  Start_time();

37  Semop(shared.semid, &postop, 1); /* up by 1 */

38  /* ожидание завершения всех потоков */

39  for (i = 0; i < nthreads; i++) {

40   Pthread_join(tid[i], NULL);

41  }

42  printf("microseconds: %.0f usec\n", Stop_time());

43  if (shared.counter != nloop * nthreads)

44   printf("error: counter = %ld\n", shared, counter);

45  Semctl(shared.semid, 0, IPC_RMID);

46  exit(0);

47 }

Листинг А.28. Увеличение общего счетчика с использованием семафоров System V

//bench/incr_svsem1.c

48 void *

49 incr(void *arg)

50 {

51  int i;

52  for (i = 0; i < nloop; i++) {

53   Semop(shared.semid, &waitop, 1);

54   shared.counter++;

55   Semop(shared.semid, &postop, 1);

56  }

57  return(NULL);

58 }

20-23 Создается семафор с одним элементом, значение которого инициализируется нулем.

24-29 Инициализируются две структуры semop: одна для увеличения семафора, а другая для ожидания его изменения. Обратите внимание, что поле sem_flg в обеих структурах имеет значение 0: флаг SEM_UNDO не установлен.

Семафоры System V с флагом SEM_UNDO

Единственное отличие от пpoгрaммы из листинга А.27 заключается в том, что поле sem_flg структур semop устанавливается равным SEM_UNDO, а не 0. Мы не приводим в книге новый листинг с этим небольшим изменением.

Блокировка записей fcntl

Последняя пpoгрaммa использует fcntl для синхронизации. Функция main приведена в листинге А.30. Эта программа будет выполняться успешно только в том случае, если количество потоков равно 1, поскольку блокировка fcntl предназначена для использования между процессами, а не между потоками одного процесса. При указании нескольких потоков каждый из них всегда имеет возможность получить блокировку (то есть вызовы writew_lock не приводят к остановке потока, потому что процесс уже является владельцем блокировки), и конечное значение счетчика оказывается неправильным.

Функция incr, использующая блокировку записей, приведена в листинге А.29.

Листинг А.29. Увеличение общего счетчика с использованием блокировки записей fcntl

//bench/incr_fcntl1.e

44 void *

45 incr(void *arg)

46 {

47  int i;

48  for (i = 0; i < nloop; i++) {

49   Writew_lock(shared.fd, 0, SEEK_SET, 0);

50   shared.counter++;

51   Un_lock(shared.fd, 0, SEEK_SET, 0);

52  }

53 return(NULL);

54 }

Листинг А.30. Функция main для измерения производительности блокировки fcntl

//bench/incr_fcntl1.e

4  #include "unpipc.h"

5  #define MAXNTHREADS 100


6  int nloop;

7  struct {

8   int fd;

9   long counter;

10 } shared;

11 void *incr(void *);


12 int

13 main(int argc, char **argv)

14 {

15  int i, nthreads;

16  char *pathname;

17  pthread_t tid[MAXNTHREADS];

18  if (argc != 4)

19   err_quit("usage: incr_fcntll <#loops><#threads>");

20  pathname = argv[1];

21  nloop = atoi(argv[2]);

22  nthreads = min(atoi(argv[3]), MAXNTHREADS);

23  /* создание файла и получение блокировки на запись */

24  shared.fd = Open(pathname, O_RDWR | O_CREAT | O_TRUNC, FILE_MODE);

25  Writew_lock(shared.fd, 0, SEEK_SET, 0);

26  /* создание всех потоков */

27  Set_concurrency(nthreads);

28  for (i = 0; i < nthreads; i++) {

29   Pthread_create(&tid[i], NULL, incr, NULL);

30  }

31  /* запуск таймера и снятие блокировки на запись */

32  Start_time();

33  Un_lock(shared.fd, 0, SEEK_SET, 0);

34  /* ожидание завершения всех потоков */

35  for (i = 0; i < nthreads; i++) {

36   Pthread_join(tid[i], NULL);

37  }

38  printf("microseconds: %.0f usec\n", Stop_time());

39  if (shared.counter != nloop * nthreads)

40   printf("error: counter = %ld\n", shared.counter);

41  Unlink(pathname);

42  exit(0);

43 }

15-19 Полное имя создаваемого и используемого для блокировки файла принимается в качестве аргумента командной строки. Это позволяет измерять скорость работы для разных файловых систем. Можно ожидать, что программа будет работать гораздо медленнее при использовании NFS (если она вообще будет работать, то есть если сервер и клиент NFS поддерживают блокировку записей NFS).

А.6. Синхронизация процессов: программы