Программирование на языке Ruby

Фултон Хэл

Глава 13. Потоки в Ruby

 

 

Потоки еще иногда называют облегченными процессами. Это просто способ обеспечить параллельное выполнение без накладных расходов, связанных с контекстным переключением между процессами. (Впрочем, общего согласия по поводу того, что такое поток, нет, поэтому мы не будем углубляться в данный вопрос.)

В Ruby потоки определены на пользовательском уровне и не зависят от операционной системы. Они работают в DOS так же, как и в UNIX. Но, конечно, это снижает производительность, а на сколько именно, зависит от операционной системы.

Потоки полезны, например, тогда, когда некоторые части программы могут работать независимо друг от друга. Применяются они и в тех случаях, когда приложение тратит много времени на ожидание события. Часто, пока один поток ждет, другой может выполнять полезную работу.

С другой стороны, у потоков есть и недостатки. Всегда надо взвешивать, оправданно ли их применение в конкретном случае. К тому же, иногда доступ к ресурсу принципиально должен осуществляться строго последовательно, поэтому потоки не дадут никакого выигрыша. И, наконец, бывает так, что накладные расходы на синхронизацию доступа к глобальным ресурсам превышают экономию, достигаемую за счет использования нескольких потоков.

По этой и ряду других причин некоторые авторитеты вообще рекомендуют держаться подальше от многопоточного программирования. Действительно, такие программы сложны и подвержены ошибкам, которые трудно отлаживать. Но мы оставим читателю самому решать, когда стоит применять эту технику.

Проблемы, связанные с несинхронизированными потоками, хорошо известны. При одновременном доступе к глобальным данным со стороны нескольких потоков данные могут быть запорчены. Если один поток делает какое-то допущение о том, что успел выполнить другой поток, возможна гонка (race condition); обычно это приводит к «недетерминированному» коду, который дает разные результаты при каждом запуске. Наконец, существует опасность тупиковой ситуации, когда ни один поток не может продолжить выполнение, поскольку ожидает ресурс, занятый другим потоком. Код, написанный так, что ни одна из этих проблем не возникает, называется безопасным относительно потоков.

Не все в Ruby безопасно относительно потоков, но имеются методы синхронизации, которые позволяют контролировать доступ к переменным и ресурсам, защищать критические секции программы и избегать тупиковых ситуаций. Мы рассмотрим их в этой главе и проиллюстрируем на примерах.

 

13.1. Создание потоков и манипулирование ими

 

К числу основных операций над потоками относятся создание потока, передача ему входной информации и получение результатов, останов потока и т.д. Можно получить список запущенных потоков, опросить состояние потока и выполнить ряд других проверок.

Ниже представлен обзор основных операций.

 

13.1.1. Создание потоков

Создать поток просто: достаточно вызвать метод new и присоединить блок, который будет исполняться в потоке.

thread = Thread.new do

 # Предложения, исполняемые в потоке...

end

Возвращаемое значение — объект типа Thread. Главный поток программы может использовать его для управления вновь созданным потоком.

А если нужно передать потоку параметры? Достаточно передать их методу Thread.new, который, в свою очередь, передаст их блоку.

a = 4

b = 5

с = 6

thread2 = Thread.new(а,b,с) do |a, x, y|

 # Манипуляции с a, x и y.

end

# Если переменная а будет изменена новым потоком,

# то главный поток не получит об этом никакого уведомления.

Параметры блока, являющиеся ссылками на существующие переменные, практически неотличимы от самих переменных. Поэтому, например, переменная а в каком-то смысле «опасна», что и отражено в комментарии.

Поток может также обращаться к переменным из той же области видимости, в которой был создан. Ясно, что без синхронизации это может стать источником проблем. Главный и любой другой поток могут изменять такую переменную независимо друг от друга, и результаты подобных действий непредсказуемы.

x = 1

y = 2

thread3 = Thread.new do

 # Этот поток может манипулировать переменными x and y

 # из внешней области видимости, но это не всегда безопасно.

 sleep(rand(0)) # Спать в течение случайно выбранного времени

 # (меньше секунды).

 x = 3

end

sleep(rand(0))

puts x

# Если запустить эту программу несколько раз подряд, то может быть

# напечатано как 1, так и 3!

У метода new есть синоним fork — это имя выбрано по аналогии с хорошо известным системным вызовом в UNIX.

 

13.1.2. Доступ к локальным переменным потока

Мы знаем об опасности доступа из потока к переменным, определенным вне его области видимости, но мы также знаем, что у потока могут быть локальные данные. А что делать, если поток хочет «обнародовать» часть принадлежащих ему данных?

Для этой цели предусмотрен специальный механизм. Если объект Thread рассматривать как хэш, то к локальным данным потока можно обратиться из любого места в области видимости этого объекта. Мы не хотим сказать, что так можно обратиться к настоящим локальным переменным; это допустимо лишь для доступа к именованным данным, своим для каждого потока.

Существует также метод key?, который сообщает, используется ли указанное имя в данном потоке.

Внутри потока к таким данным тоже следует обращаться, как к хэшу. Метод Thread.current позволяет сделать запись чуть менее громоздкой.

thread = Thread.new do

t = Thread.current

t[:var1] = "Это строка"

t[:var2] = 365

end

# Доступ к локальным данным потока извне...

x = thread[:var1]              # "Это строка"

y = thread[:var2]              # 365

has_var2 = thread.key?("var2") # true

has_var3 = thread.key?("var3") # false

Отметим, что эти данные доступны другим потокам даже после того, их владелец завершил работу (как в данном случае).

Помимо символа (см. выше), для идентификации локальной переменной потока можно употреблять и строки.

thread = Thread.new do

t = Thread.current

t["var3"] = 25

t[:var4] = "foobar"

end

a = thread[:var3] = 25

b = thread["var4"] = "foobar"

He путайте эти специальные имена с настоящими локальными переменными. В следующем фрагменте разница видна более отчетливо:

thread = Thread.new do

 t = Thread.current

 t["var3"] = 25

 t[:var4] = "foobar"

 var3 = 99         # Настоящие локальные переменные

 var4 = "zorch"    # (извне недоступны)

end

a = thread[:var3]  # 25

b = thread["var4"] # "foobar"

И еще отметим, что ссылку на объект (на настоящую локальную переменную) внутри потока можно использовать для сокращенной записи. Это справедливо, если вы сохраняете одну и ту же ссылку, а не создаете новую.

thread = Thread.new do

 t = Thread.current

 x = "nXxeQPdMdxiBAxh"

 t[:my_message] = x

 x.reverse!

 x.delete! "x"

 x.gsub!(/[A-Z]/,"")

 # С другой стороны, присваивание создает новый объект,

 # поэтому "сокращение" становится бесполезным...

end

а = thread[:my_message] # "hidden"

Ясно, что сокращение не будет работать и в том случае, когда вы имеете дело с объектами наподобие Fixnum, которые хранятся как непосредственные значения, а не ссылки.

 

13.1.3. Опрос и изменение состояния потока

В классе Thread есть несколько полезных методов класса. Метод list возвращает массив «живых» потоков, метод main возвращает ссылку на главный поток программы, который породил все остальные, а метод current позволяет потоку идентифицировать самого себя.

t1 = Thread.new { sleep 100 }

t2 = Thread.new do

 if Thread.current == Thread.main

 puts "Это главный поток." # HE печатается,

end

1.upto(1000) { sleep 0.1 }

end

count = Thread.list.size   # 3

if Thread.list.include ?(Thread.main)

 puts "Главный поток жив." # Печатается всегда!

end

if Thread.current == Thread.main

 puts "Я главный поток."   # Здесь печатается...

end

Методы exit, pass, start, stop и kill служат для управления выполнением потоков (как изнутри, так и извне):

# в главном потоке...

Thread.kill(t1)        # Завершить этот поток.

Thread.pass            # Передать управление t2.

t3 = Thread.new do

 sleep 20

 Thread.exit           # Выйти из потока.

 puts "Так не бывает!" # Никогда не выполняется.

end

Thread.kill(t2)        # Завершить t2.

# Выйти из главного потока (все остальные тоже завершаются).

Thread.exit

Отметим, что не существует метода экземпляра stop, поэтому поток может приостановить собственное выполнение, но не выполнение другого потока.

Существуют различные методы для опроса состояния потока. Метод экземпляра alive? сообщает, является ли данный поток «живым» (не завершил выполнение), а метод stop? — находится ли он в состоянии «приостановлен».

count = 0

t1 = Thread.new { loop { count += 1 } }

t2 = Thread.new { Thread.stop }

sleep 1

flags = [t1.alive?, # true

         t1.stop?,  # false

         t2.alive?, # true

         t2.stop?]  # true

Получить состояние потока позволяет метод status. Он возвращает значение "run", если поток выполняется; "sleep" — если он приостановлен, спит или ожидает результата ввода/вывода; false — если поток нормально завершился, и nil — если поток завершился в результате исключения.

t1 = Thread.new { loop {} }

t2 = Thread.new { sleep 5 }

t3 = Thread.new { Thread.stop }

t4 = Thread.new { Thread.exit }

t5 = Thread.new { raise "exception" }

s1 = t1.status # "run"

s2 = t2.status # "sleep"

s3 = t3.status # "sleep"

s4 = t4.status # false

s5 = t5.status # nil

Глобальную переменную $SAFE можно установить по-разному в разных потоках. Стало быть, она вовсе не является глобальной, но стоит ли жаловаться на это, если она позволяет разным потокам работать с разным уровнем безопасности? Метод safe_level возвращает текущий уровень безопасности потока.

t1 = Thread.new { $SAFE = 1; sleep 5 }

t2 = Thread.new { $SAFE = 3; sleep 5 }

sleep 1

lev0 = Thread.main.safe_level # 0

lev1 = t1.safe_level          # 1

lev2 = t2.safe_level          # 3

Метод доступа priority позволяет узнать и изменить приоритет потока:

t1 = Thread.new { loop { sleep 1 } }

t2 = Thread.new { loop { sleep 1 } }

t2.priority = 3  # Установить для потока t2 приоритет 3

p1 = t1.priority # 0

p2 = t2.priority # 3

Поток с большим приоритетом будет чаще получать процессорное время. Специальный метод pass позволяет передать управление планировщику. Иными словами, поток просто уступает свой временной квант, но не приостанавливается и не засыпает.

t1 = Thread.new do

 puts "alpha"

 Thread.pass

 puts "beta"

end

t2 = Thread.new do

 puts "gamma"

 puts "delta"

end

t1.join

t2.join

В этом искусственном примере вызов Thread.pass приводит к печати строк в следующем порядке: alpha gamma delta beta. Без него было бы напечатано alpha beta gamma delta. Конечно, этот механизм следует использовать не для синхронизации, а только для экономного расходования процессорного времени.

Выполнение приостановленного потока можно возобновить методами методами run или wakeup:

t1 = Thread.new do

 Thread.stop

 puts "Здесь есть изумруд."

end

t2 = Thread.new do

 Thread.stop

 puts "Вы находитесь в точке Y2."

end

sleep 1

t1.wakeup

t2.run

Между этими методами есть тонкое различие. Метод wakeup изменяет состояние потока, так что он становится готовым к выполнению, но не запускает его немедленно. Метод же run пробуждает поток и сразу же планирует его выполнение.

В данном случае t1 просыпается раньше t2, но t2 планируется первым, что приводит к следующему результату:

Вы находитесь в точке Y2.

Здесь есть изумруд.

Конечно, было бы неосмотрительно реализовывать синхронизацию на основе этого механизма.

Метод экземпляра raise возбуждает исключение в потоке, от имени которого вызван. (Этот метод необязательно вызывать в том потоке, которому адресовано исключение.)

factorial1000 = Thread.new do

 begin

  prod = 1

  1.upto(1000) {|n| prod *= n }

  puts "1000! = #{prod}"

 rescue

  # Ничего не делать...

 end

end

sleep 0.01 # На вашей машине значение может быть иным.

if factorial1000.alive?

 factorial1000.raise("Стоп!")

 puts "Вычисление было прервано!"

else

 puts "Вычисление успешно завершено."

end

Поток, запущенный в предыдущем примере, пытался вычислить факториал 1000. Если для этого не хватило одной сотой секунды, то главный поток завершит его. Как следствие, на относительно медленной машине будет напечатано сообщение «Вычисление было прервано!» Что касается части rescue внутри потока, то в ней мог бы находиться любой код, как, впрочем, и всегда.

 

13.1.4. Назначение рандеву (и получение возвращенного значения)

Иногда главный поток хочет дождаться завершения другого потока. Для этой цели предназначен метод join:

t1 = Thread.new { do_something_long() }

do_something_brief()

t1.join # Ждать завершения t1.

Отметим, что вызывать метод join необходимо, если нужно дождаться завершения другого потока. В противном случае главный поток завершится, а вместе с ним и все остальные. Например, следующий код никогда не напечатал бы окончательный ответ, не будь в конце вызова join:

meaning_of_life = Thread.new do

 puts "Смысл жизни заключается в..."

 sleep 10

 puts 42

end

sleep 9

meaning_of_life.join

Существует полезная идиома, позволяющая вызвать метод join для всех «живых» потоков, кроме главного (ни один поток, даже главный, не может вызывать join для самого себя).

Thread.list.each { |t| t.join if t != Thread.main }

Конечно, любой поток, а не только главный, может вызвать join для любого другого потока. Если главный поток и какой-то другой попытаются вызвать join друг для друга, возникнет тупиковая ситуация. Интерпретатор обнаружит это и завершит программу.

thr = Thread.new { sleep 1; Thread.main.join }

thr.join # Тупиковая ситуация!

С потоком связан блок, который может возвращать значение. Следовательно, и сам поток может возвращать значение. Метод value неявно вызывает join и ждет, пока указанный поток завершится, а потом возвращает значение последнего вычисленного в потоке выражения.

max = 10000

thr = Thread.new do

 sum = 0

 1.upto(max) { |i| sum += i }

 sum

end

guess = (max*(max+1))/2

print "Формула "

if guess == thr.value

 puts "правильна."

else

 puts "неправильна."

end

 

13.1.5. Обработка исключений

Что произойдет, если в потоке возникнет исключение? Как выясняется, поведение можно сконфигурировать заранее.

Существует флаг abort_on_exception, который работает как на уровне класса, так и на уровне экземпляра. Он реализован в виде метода доступа (то есть позволяет читать и устанавливать атрибут) на обоих уровнях. Если abort_on_exception для некоторого потока равен true, то при возникновении в этом потоке исключения будут завершены и все остальные потоки.

Thread.abort_on_exception = true

t1 = Thread.new do

 puts "Привет!"

 sleep 2

 raise "some exception"

 puts "Пока!"

end

t2 = Thread.new { sleep 100 }

sleep 2

puts "Конец"

В этом примере флаг abort_on_exception установлен в true на уровне системы в целом (отменяя подразумеваемое по умолчанию значение). Следовательно, когда в потоке t1 возникает исключение, завершаются и t1, и главный поток. Печатается только слово «Привет!».

В следующем примере эффект такой же:

t1 = Thread.new do

 puts "Привет!"

 sleep 2

 raise "some exception"

 puts "Пока!"

end

t1.abort_on_exception = true

t2 = Thread.new { sleep 100 }

sleep 2

puts "Конец"

А вот в следующем оставлено принимаемое по умолчанию значение false, и мы наконец-то видим слово «Конец», печатаемое главным потоком (слова «Пока!» мы не увидим никогда, поскольку поток t1 при возникновении исключения завершается безусловно).

t1 = Thread.new do

 puts "Привет!"

 sleep 2

 raise "some exception"

 puts "Пока!"

end

t2 = Thread.new { sleep 100 }

sleep 2

puts "Конец"

# Выводится:

Привет!

Конец

 

13.1.6. Группы потоков

Группа потоков — это механизм управления логически связанными потоками. По умолчанию все потоки принадлежат группе Default (это константа класса). Но если создать новую группу, то в нее можно будет помещать потоки.

В любой момент времени поток может принадлежать только одной группе. Если поток помещается в группу, то он автоматически удаляется из той группы, которой принадлежал ранее.

Метод класса ThreadGroup.new создает новую группу потоков, а метод экземпляра add помещает поток в группу.

f1 = Thread.new("file1") { |file| waitfor(file) }

f2 = Thread.new("file2") { |file| waitfor(file) }

file_threads = ThreadGroup.new

file_threads.add f1

file_threads.add f2

Метод экземпляра list возвращает массив всех потоков, принадлежащих данной группе.

# Подсчитать все "живые" потоки в группе this_group.

count = 0

this_group.list.each {|x| count += 1 if x.alive? }

if count < this_group.list.size

 puts "Некоторые потоки в группе this_group уже скончались."

else

 puts "Все потоки в группе this_group живы."

end

В класс ThreadGroup можно добавить немало полезных методов. В примере ниже показаны методы для возобновления всех потоков, принадлежащих группе, для группового ожидания потоков (с помощью join) и для группового завершения потоков:

class ThreadGroup

def wakeup

 list.each { |t| t.wakeup }

end

def join

 list.each { |t| t.join if t != Thread.current }

end

def kill

 list.each { |t| t.kill }

end

end

 

13.2. Синхронизация потоков

 

Почему необходима синхронизация? Потому что из-за «чередования» операций доступ к переменным и другим сущностям может осуществляться в порядке, который не удается установить путем чтения исходного текста отдельных потоков. Два и более потоков, обращающихся к одной и той же переменной, могут взаимодействовать между собой непредвиденными способами, и отлаживать такую программу очень трудно.

Рассмотрим простой пример:

x = 0

t1 = Thread.new do

 1.upto(1000) do

  x = x + 1

 end

end

t2 = Thread.new do

 1.upto(1000) do

  x = x + 1

 end

end

t1.join

t2.join

puts x

Сначала переменная x равна 0. Каждый поток увеличивает ее значение на тысячу раз. Логика подсказывает, что в конце должно быть напечатано 2000.

Но фактический результат противоречит логике. На конкретной машине было напечатано значение 1044. В чем дело?

Мы предполагали, что инкремент целого числа — атомарная (неделимая) операция. Но это не так. Рассмотрим последовательность выполнения приведенной выше программы. Поместим поток t1 слева, а поток t2 справа. Каждый квант времени занимает одну строчку и предполагается, что к моменту, когда был сделан этот мгновенный снимок, переменная x имела значение 123.

t1                            t2

--------------------------    -----------------------------

Прочитать значение x (123)

                              Прочитать значение x (123)

Увеличить значение на 1 (124)

                              Увеличить значение на 1 (124)

Записать результат в x

                              Записать результат в x

Ясно, что каждый поток увеличивает на 1 то значение, которое видит. Но не менее ясно и то, что после увеличения на 1 обоими потоками x оказалось равно всего 124.

И это лишь самая простая из проблем, возникающих в связи с синхронизацией. Для решения более сложных приходится прилагать серьезные усилия — это предмет изучения специалистами в области теоретической информатики и математики.

 

13.2.1. Синхронизация с помощью критических секций

Простейший способ синхронизации дают критические секции. Когда поток входит в критическую секцию программы, гарантируется, что никакой другой поток не войдет в нее, пока первый не выйдет.

Если акцессору Thread.critical присвоить значение true, то выполнение других потоков не будет планироваться. В следующем примере мы переработали код предыдущего, воспользовавшись акцессором critical для определения критической области, которая защищает уязвимые участки программы.

x = 0

t1 = Thread.new do

 1.upto(1000) do

  Thread.critical = true

  x = x + 1

  Thread.critical = false

 end

end

t2 = Thread.new do

 1.upto(1000) do

  Thread.critical = true

  x = x + 1

  Thread.critical = false

 end

end

t1.join

t2.join

puts x

Теперь последовательность выполнения изменилась; взгляните, в каком порядке работают потоки t1 и t2. (Конечно, вне того участка, где происходит увеличение переменной, потоки могут чередоваться более-менее случайным образом.)

t1                            t2

----------------------------- -----------------------------

Прочитать значение x (123)

Увеличить значение на 1 (124)

Записать результат в x

                              Прочитать значение x (124)

                              Увеличить значение на 1 (125)

                              Записать результат в x

Возможны такие комбинации операций с потоками, при которых поток планируется даже тогда, когда какой-то другой поток находится в критической секции.

Простейший случай — вновь созданный поток начинает исполнение немедленно вне зависимости от того, занимает какой-то другой поток критическую секцию или нет. Поэтому описанную технику лучше применять только в самых простых ситуациях.

 

13.2.2. Синхронизация доступа к ресурсам (mutex.rb)

В качестве примера рассмотрим задачу индексирования Web-сайтов. Мы извлекаем слова из многочисленных страниц в Сети и сохраняем их в хэше. Ключом является само слово, а значением — строка, идентифицирующая документ и номер строки в этом документе.

Постановка задачи и так достаточно груба. Но мы огрубим ее еще больше, введя следующие упрощающие допущения:

• будем представлять удаленные документы в виде строк;

• ограничимся всего тремя строками (они будут «зашиты» в код);

• сетевые задержки будем моделировать «засыпанием» на случайный промежуток времени.

Взгляните на программу в листинге 13.1. Она даже не печатает получаемые данные целиком, а выводит лишь счетчик слов (не уникальный). Каждый раз при чтении или обновлении хэша мы вызываем метод hesitate, который приостанавливает поток на случайное время. Тем самым поведение программы становится недетерминированным и приближенным к реальности.

Листинг 13.1. Программа индексирования с ошибками (гонка)

@list = []

@list[0]="shoes ships\nsealing-wax"

@list[1]="cabbages kings"

@list[2]="quarks\nships\ncabbages"

def hesitate

 sleep rand(0)

end

@hash = {}

def process_list(listnum)

 lnum = 0

 @list[listnum].each do |line|

  words = line.chomp.split

  words.each do |w|

   hesitate

   if @hash[w]

    hesitate

    @hash[w] += ["#{listnum}:#{lnum}"]

   else

    hesitate

    @hash[w] = ["#{listnum}:#{lnum}"]

   end

  end

  lnum += 1

 end

end

t1 = Thread.new(0) {|num| process_list(num) }

t2 = Thread.new(1) {|num| process_list(num) }

t3 = Thread.new(2) {|num| process_list(num) }

t1.join

t2.join

t3.join

count = 0

@hash.values.each {|v| count += v.size }

puts "Всего слов: #{count} " # Может быть напечатано 7 или 8!

Здесь имеется проблема. Если ваша система ведет себя примерно так же, как наша, то программа может напечатать одно из двух значений! В наших тестах с одинаковой вероятностью печаталось 7 или 8. Если слов и списков больше, то и разброс окажется более широким.

Попробуем исправить положение с помощью мьютекса, который будет контролировать доступ к разделяемому ресурсу. (Слово «mutex» — это сокращение от mutual exclusion, «взаимная блокировка».)

Обратимся к листингу 13.2. Библиотека Mutex позволяет создавать мьютексы и манипулировать ими. Мы можем захватить (lock) мьютекс перед доступом к хэшу и освободить (unlock) его по завершении операции.

Листинг 13.2. Программа индексирования с мьютексом

require 'thread.rb'

@list = []

@list[0]="shoes ships\nsealing-wax"

@list[1]="cabbages kings"

@list[2]="quarks\nships\ncabbages"

def hesitate

 sleep rand(0)

end

@hash = {}

@mutex = Mutex.new

def process_list(listnum)

 lnum = 0

 @list[listnum].each do |line|

  words = line.chomp.split

  words.each do |w|

   hesitate

   @mutex.lock

   if @hash[w]

    hesitate

    @hash[w] += ["#{listnum}:#{lnum}"]

   else

    hesitate

    @hash[w] = ["#{listnum}:#{lnum}"]

   end

   @mutex.unlock

  end

  lnum += 1

 end

end

t1 = Thread.new(0) {|num| process_list(num) }

t2 = Thread.new(1) {|num| process_list(num) }

t3 = Thread.new(2) {|num| process_list(num) }

t1.join

t2.join

t3.join

count = 0

@hash.values.each {|v| count += v.size }

puts "Всего слов: #{count} " # Всегда печатается 8!

Отметим, что помимо метода lock в классе Mutex есть также метод try_lock. Он отличается от lock тем, что если мьютекс уже захвачен другим потоком, то он не дожидается освобождения, а сразу возвращает false.

require 'thread'

mutex = Mutex.new

t1 = Thread.new { mutex.lock; sleep 30 }

sleep 1

t2 = Thread.new do

 if mutex.try_lock

  puts "Захватил"

 else

  puts "He сумел захватить" # Печатается немедленно.

 end

end

sleep 2

Эта возможность полезна, если поток не хочет приостанавливать выполнение. Есть также метод synchronize, который захватывает мьютекс, а потом автоматически освобождает его.

mutex = Mutex.new

mutex.synchronize do

 # Любой код, нуждающийся в защите...

end

Существует еще библиотека mutex_m, где определен модуль Mutex_m, который можно подмешивать к классу (или использовать для расширения объекта). У такого расширенного объекта будут все методы мьютекса, так что он сам может выступать в роли мьютекса.

require 'mutex_m'

class MyClass

 include Mutex_m

 # Теперь любой объект класса MyClass может вызывать

 # методы lock, unlock, synchronize...

 # Внешние объекты также могут вызывать эти

 # методы для объекта MyClass.

end

 

13.2.3. Предопределенные классы синхронизированных очередей

В библиотеке thread.rb есть пара классов, которые иногда бывают полезны. Класс Queue реализует безопасную относительно потоков очередь, доступ к обоим концам которой синхронизирован. Это означает, что разные потоки могут, ничего не опасаясь, работать с такой очередью. Класс SizedQueue отличается от предыдущего тем, что позволяет ограничить размер очереди (число элементов в ней).

Оба класса имеют практически один и тот же набор методов, поскольку SizedQueue наследует Queue. Правда, в подклассе определен еще акцессор max, позволяющий получить и установить максимальный размер очереди.

buff = SizedQueue.new(25)

upper1 = buff.max #25

# Увеличить размер очереди...

buff.max = 50

upper2 = buff.max # 50

В листинге 13.3 приведено решение задачи о производителе и потребителе. Для производителя задержка (аргумент sleep) чуть больше, чем для потребителя, чтобы единицы продукции «накапливались».

Листинг 13.3. Задача о производителе и потребителе

require 'thread'

buffer = SizedQueue.new(2)

producer = Thread.new do

 item = 0

 loop do

  sleep rand 0

  puts "Производитель произвел #{item}"

  buffer.enq item

  item += 1

 end

end

consumer = Thread.new do

 loop do

  sleep (rand 0)+0.9

  item = buffer.deq

  puts "Потребитель потребил #{item}"

  puts " ожидает = #{buffer.num_waiting}"

 end

end

sleep 60 # Работать одну минуту, потом завершить оба потока.

Чтобы поместить элемент в очередь и извлечь из нее, рекомендуется применять соответственно методы enq и deq. Можно было бы для помещения в очередь пользоваться также методом push, а для извлечения — методами pop и shift, но их названия не так мнемоничны в применении к очередям.

Метод empty? проверяет, пуста ли очередь, а метод clear опустошает ее. Метод size (и его синоним length) возвращает число элементов в очереди.

# Предполагается, что другие потоки не мешают...

buff = Queue.new

buff.enq "one"

buff.enq "two"

buff.enq "three"

n1 = buff.size      # 3

flag1 = buff.empty? # false

buff.clear

n2 = buff.size      # 0

flag2 = buff.empty? # true

Метод num_waiting возвращает число потоков, ожидающих доступа к очереди. Если размер очереди не ограничен, то это потоки, ожидающие возможности удалить элементы; для ограниченной очереди включаются также потоки, пытающиеся добавить элементы.

Необязательный параметр non_block метода deq в классе Queue по умолчанию равен false. Если же он равен true, по при попытке извлечь элемент из пустой очереди он не блокирует поток, а возбуждает исключение ThreadError.

 

13.2.4. Условные переменные

Условная переменная — это, по существу, очередь потоков. Они используются в сочетании с мьютексами для лучшего управления синхронизацией потоков.

Условная переменная всегда ассоциируется с каким-то мьютексом. Ее назначение — освободить мьютекс до тех пор, пока не начнет выполняться определенное условие. Представьте себе ситуацию, когда поток захватил мьютекс, но не готов продолжать выполнение. Тогда он может заснуть под контролем условной переменной, ожидая, что будет разбужен, когда условие станет истинным.

Важно понимать, что пока поток ждет условную переменную, мьютекс свободен, поэтому другие потоки могут получить доступ к защищенному им ресурсу. А как только другой поток сигнализирует этой переменной, ожидающий поток пробуждается и пытается вновь захватить мьютекс.

Рассмотрим несколько искусственный пример в духе задачи об обедающих философах. Представьте себе, что вокруг стола сидят три скрипача, ожидающих своей очереди поиграть. Но у них есть всего две скрипки и один смычок. Понятно, что скрипач сможет играть, только если одновременно завладеет одной из скрипок и смычком.

Мы поддерживаем счетчики свободных скрипок и смычков. Когда скрипач хочет получить скрипку и смычок, он должен ждать их освобождения. В программе ниже мы защитили проверку условия мьютексом и под его защитой ждем скрипку и смычок порознь. Если скрипка или смычок заняты, поток засыпает. Он не владеет мьютексом до тех пор, пока другой поток не просигнализирует о том, что ресурс свободен. В этот момент первый поток просыпается и снова захватывает мьютекс.

Код представлен в листинге 13.4.

Листинг 13.4. Три скрипача

require 'thread'

@music = Mutex.new

@violin = ConditionVariable.new

@bow = ConditionVariable.new

@violins_free = 2

@bows_free = 1

def musician(n)

 loop do

  sleep rand(0)

  @music.synchronize do

   @violin.wait(@music) while @violins_frее == 0

   @violins_free -= 1

   puts "#{n} владеет скрипкой"

   puts "скрипок #@violins_frее, смычков #@bows_free"

   @bow.wait(@music) while @bows_free == 0

   @bows_free -= 1

   puts "#{n} владеет смычком"

   puts "скрипок #@violins_free, смычков #@bows_free"

  end

  sleep rand(0)

  puts "#{n}: (...играет...)"

  sleep rand(0)

  puts "#{n}: Я закончил."

  @music.synchronize do

   @violins_free += 1

   @violin.signal if @violins_free == 1

   @bows_free += 1

   @bow.signal if @bows_free == 1

  end

 end

end

threads = []

3.times {|i| threads << Thread.new { musician(i) } }

threads.each {|t| t.join }

Мы полагаем, что это решение никогда не приводит к тупиковой ситуации, хотя доказать этого не сумели. Но интересно отметить, что описанный алгоритм не справедливый. В наших тестах оказалось, что первый скрипач играет чаще двух остальных, а второй чаще третьего. Выяснение причин такого поведения и его исправление мы оставляем читателю в качестве интересного упражнения.

 

13.2.5. Другие способы синхронизации

Еще один механизм синхронизации - это монитор, который в Ruby реализован в библиотеке monitor.rb. Это более развитый по сравнению с мьютексом механизм, основное отличие состоит в том, что захваты одного и того же мьютекса не могут быть вложенными, а монитора — могут.

Тривиальный случай возникновения такой ситуации вряд ли возможен. В самом деле, кто станет писать такой код:

@mutex = Mutex.new

@mutex.synchronize do

 @mutex.synchronize do

  #...

 end

end

Но нечто подобное может произойти в сложной программе (или при рекурсивном вызове метода). Какова бы ни была причина, последствием будет тупиковая ситуация. Уход от нее — одно из достоинств модуля-примеси Monitor.

@mutex = Mutex.new

def some_method

 @mutex.synchronize do

  #...

  some_other_method # Тупиковая ситуация!

 end

end

def some_other_method

 @mutex.synchronize do

  #...

 end

end

Модуль-примесь Monitor обычно применяется для расширения объекта. Для создания условной переменной предназначен метод new_cond.

Класс ConditionVariable в библиотеке monitor.rb дополнен по сравнению с определением в библиотеке thread. У него есть методы wait_until и wait_while, которые блокируют поток в ожидании выполнения условия. Кроме того, возможен тайм-аут при ожидании, поскольку у метода wait имеется параметр timeout, равный количеству секунд (по умолчанию nil).

Поскольку примеры работы с потоками у нас кончаются, то в листинге 13.5 мы предлагаем реализацию классов Queue и SizedQueue с помощью монитора. Код приводится с разрешения автора, Шуго Маэда (Shugo Maeda).

Листинг 13.5. Реализация класса Queue с помощью монитора

# Автор: Shugo Maeda

require 'monitor'

class Queue

 def initialize

  @que = []

  @monitor = Monitor.new

  @empty_cond = @monitor.new_cond

 end

 def enq(obj)

  @monitor.synchronize do

   @que.push(obj)

   @empty_cond.signal

  end

 end

 def deq

  @monitor.synchronize do

   while @que.empty?

    @empty_cond.wait

   end

   return @que.shift

  end

 end

end

class SizedQueue < Queue

 attr :max

 def initialize(max)

  super()

  @max = max

  @full_cond = @monitor.new_cond

 end

 def enq(obj)

  @monitor.synchronize do

   while @que.length >= @max

    @full_cond.wait

   end

   super(obj)

  end

 end

 def deq

  @monitor.synchronize do

   obj = super

   if @que.length < @max

    @full_cond.signal

   end

   return obj

  end

 end

 def max=(max)

  @monitor.synchronize do

   @max = max

   @full_cond.broadcast

  end

 end

end

Еще один вариант синхронизации (двузначную блокировку со счетчиком) предлагает библиотека sync.rb. В ней определен модуль Sync_m, который можно применять вместе с ключевыми словами include и extend (как и Mutex_m). Этот модуль содержит методы locked?, shared?, exclusive?, lock, unlock и try_lock.

 

13.2.6. Тайм-аут при выполнении операций

Часто встречается ситуация, когда на выполнение операции отводится определенное максимальное время. Это позволяет избежать бесконечных циклов и более строго контролировать порядок работы. Подобная возможность очень полезна, в частности, в сетевых приложениях, где ответ от сервера может и не прийти.

Библиотека timeout.rb предлагает решение этой проблемы на основе потоков (см. листинг 13.6). С методом timeout ассоциирован выполняемый блок. Если истечет заданное число секунд, метод возбуждает исключение TimeoutError, которое можно перехватить с помощью rescue.

Листинг 13.6. Пример тайм-аута

require 'timeout.rb'

flag = false

answer = nil

begin

 timeout(5) do

  puts "Хочу печенье!"

  answer = gets.chomp

  flag = true

 end

 rescue TimeoutError

 flag = false

end

if flag

 if answer == "cookie"

  puts "Спасибо! Хрум, хрум..."

 else

  puts "Это же не печенье!"

  exit

 end

else

 puts "Эй, слишком медленно!"

 exit

end

puts "До встречи..."

 

13.2.7. Ожидание события

Часто один или несколько потоков следят за «внешним миром», а остальные выполняют полезную работу. Все примеры в этом разделе надуманные, но общий принцип они все же иллюстрируют.

В следующем примере прикладную задачу решают три потока. Четвертый поток каждые пять секунд просыпается, проверяет глобальную переменную $flag и, когда видит, что флаг поднят, пробуждает еще два потока. Это освобождает три рабочих потока от необходимости напрямую общаться с двумя другими и, возможно, от многочисленных попыток разбудить их.

$flag = false

work1 = Thread.new { job1() }

work2 = Thread.new { job2() }

work3 = Thread.new { job3() }

thread4 = Thread.new { Thread.stop; job4() }

thread5 = Thread.new { Thread.stop; job5() }

watcher = Thread.new do

 loop do

  sleep 5

  if $flag

   thread4.wakeup

   thread5.wakeup

   Thread.exit

  end

 end

end

Если в какой-то момент выполнения метода job, переменная $flag станет равной true, то в течение пяти секунд после этого потоки thread4 и thread5 гарантированно запустятся. После этого поток watcher завершается.

В следующем примере мы ждем создания файла. Каждые 30 секунд проверяется его существование, и как только файл появится, мы запускаем новый поток. Тем временем остальные потоки занимаются своим делом. На самом деле ниже мы наблюдаем за тремя разными файлами.

def waitfor(filename)

 loop do

  if File.exist? filename

   file_processor = Thread.new { process_file(filename) }

   Thread.exit

  else

   sleep 30

  end

 end

end

waiter1 = Thread.new { waitfor("Godot") }

sleep 10

waiter2 = Thread.new { waitfor("Guffman") }

sleep 10

headwaiter = Thread.new { waitfor("head") }

# Основной поток занимается другими делами...

Есть много ситуаций, когда поток должен ожидать внешнего события (например, в сетевых приложениях так бывает, когда сервер на другом конце соединения работает медленно или ненадежно).

 

13.2.8. Продолжение обработки во время ввода/вывода

Часто приложению приходится выполнять одну или более длительных операций ввода/вывода. Прежде всего, речь идет о вводе данных с клавиатуры, поскольку человек печатает куда медленнее, чем вращается диск. Это время можно употребить на пользу с помощью потоков.

Возьмем, к примеру, шахматную программу, которая должна ждать, пока человек сделает ход. Конечно, мы можем изложить только сам принцип, не вдаваясь в технические детали.

Предположим, что итератор predict_move генерирует вероятные ходы человека (и ответные ходы программы). Тогда в момент, когда человек сделает ход, не исключено, что у компьютера уже будет готов ответ.

scenario = {} # Хэш ход-ответ.

humans_turn = true

thinking_ahead = Thread.new(board) do

 predict_move do |m|

  scenario[m] = my_response(board,m)

  Thread.exit if humans_turn == false

 end

end

human_move = get_human_move(board)

humans_turn = false # Остановить поток.

# Теперь можно посмотреть, нет ли в хэше scenario хода,

# сделанного пользователем...

Конечно, настоящие шахматные программы работают не так.

 

13.2.9. Реализация параллельных итераторов

Предположим, что нужно параллельно обходить несколько объектов, то есть для каждого объекта найти первый элемент, потом второй, потом третий и т.д.

Рассмотрим следующий пример. Пусть compose — имя магического метода, который выполняет композицию итераторов. Допустим еще, что у каждого объекта есть стандартный итератор each и что каждый объект возвращает по одному элементу на каждой итерации.

arr1 = [1, 2, 3, 4]

arr2 = [5, 10, 15, 20]

compose(arr1, arr2) {|a,b| puts "#{а} и #{b}" }

# Должно быть напечатано:

# 1 и 5

# 2 и 10

# 3 и 15

# 4 и 20

Можно было бы, конечно, использовать для этой цели zip. Но если нужно более элегантное решение, при котором все элементы не будут храниться одновременно, то без потоков не обойтись. Такое решение представлено в листинге 13.7.

Листинг 13.7. Параллельные итераторы

def compose(*objects)

 threads = []

 for obj in objects do

  threads << Thread.new(obj) do |myobj|

   me = Thread.current

   me[:queue] = []

   myobj.each {|x| me[:queue].push(x) }

  end

 end

 list = [0]                     # Фиктивное значение, отличное от nil.

 while list.nitems > 0 do       # Еще есть не nil.

  list = []

  for thr in threads

   list << thr[:queue].shift    # Удалить по одному из каждого.

  end

  yield list if list.nitems > 0 # He вызывать yield, если все равны nil.

 end

end

x = [1, 2, 3, 4, 5, 6, 7, 8]

y = "    первый\n    второй\n    третий\n четвертый\n    пятый\n"

z = %w[a b с d e f]

compose(x, у, z) {|a,b,c| p [a, b, c] }

# Выводится:

# [1, "    первый\n", "a"]

# [2, "    второй\n", "b"]

# [3, "    третий\n", "c"]

# [4, " четвертый\n", "d"]

# [5, "     пятый\n", "e"]

# [6, nil, "f"]

# [7, nil, nil]

# [8, nil, nil]

Обратите внимание: мы не предполагаем, что все объекты имеют одно и то же число элементов. Если один итератор доходит до конца раньше остальных, то он будет генерировать значения nil до тех пор, пока не закончит работу «самый длинный» итератор.

Конечно, можно написать и более общий метод, который на каждой итерации будет обрабатывать более одного элемента. (В конце концов, не все итераторы возвращают по одному значению за раз.) Можно было бы в первом параметре передавать число значений для каждого итератора.

Можно также пользоваться произвольными итераторами (а не только стандартным each). Их имена можно было бы передавать в виде строк, а вызывать с помощью метода send. Много чего еще можно придумать.

Впрочем, мы полагаем, что приведенного кода достаточно для большинства целей. Вариации на эту тему оставляем читателю в качестве упражнения.

 

13.2.10. Параллельное рекурсивное удаление

Забавы ради напишем код, который будет удалять дерево каталогов. Процедура рекурсивного удаления использует потоки. Как только обнаруживается очередной подкаталог, мы запускаем новый поток, который будет обходить его и удалять содержимое.

Созданные в ходе работы программы потоки хранятся в массиве threads. Поскольку это локальная переменная, у каждого потока будет собственная копия массива. Раз к ней может обращаться всего один поток, синхронизировать доступ не надо.

Отметим также, что в блок потока передается полное имя файла fullname, чтобы не нужно было беспокоиться по поводу того, что поток обращается к переменной, которую кто-то еще изменяет. Поток делает для себя локальную копию fn этой переменной.

Прежде чем удалять очередной каталог, мы должны дождаться завершения всех созданных в процессе его обхода потоков.

def delete_all(dir)

 threads = []

 Dir.foreach(dir) do |e|

  next if [".",".."].include? e # Пропустить . и ..

  fullname = dir + "/" + e

  if FileTest.directory?(fullname)

   threads << Thread.new(fullname) {|fn| delete_all(fn) }

  else

   File.delete(fullname)

  end

 end

 threads.each { |t| t.join }

 Dir.delete(dir)

end

delete_all("/tmp/stuff")

Будет ли работать такая программа быстрее, чем ее вариант без потоков? В наших тестах получалось по-разному. Возможно, это зависит от операционной системы и структуры конкретного каталога — глубины, количества файлов и т.д.

 

13.3. Заключение

Как было сказано, в Ruby не используются платформенные потоки. Программа не станет работать быстрее при наличии нескольких процессоров, но некоторого распараллеливания работы достичь все же можно. Потоки полезны во многих случаях» но писать и отлаживать многопоточную программу довольно трудно, особенно если для получения правильного результата приходится применять изощренные способы синхронизации.

Для синхронизации Ruby предоставляет такие классы, как Mutex, Monitor и ConditionVariable. Имеются также безопасные относительно потоков классы очередей Queue и SizedQueue.

В главе 14 мы перейдем от обсуждения техники программирования к решению конкретных задач, а именно сценариев системного администрирования.