Потоки еще иногда называют облегченными процессами. Это просто способ обеспечить параллельное выполнение без накладных расходов, связанных с контекстным переключением между процессами. (Впрочем, общего согласия по поводу того, что такое поток, нет, поэтому мы не будем углубляться в данный вопрос.)
В Ruby потоки определены на пользовательском уровне и не зависят от операционной системы. Они работают в DOS так же, как и в UNIX. Но, конечно, это снижает производительность, а на сколько именно, зависит от операционной системы.
Потоки полезны, например, тогда, когда некоторые части программы могут работать независимо друг от друга. Применяются они и в тех случаях, когда приложение тратит много времени на ожидание события. Часто, пока один поток ждет, другой может выполнять полезную работу.
С другой стороны, у потоков есть и недостатки. Всегда надо взвешивать, оправданно ли их применение в конкретном случае. К тому же, иногда доступ к ресурсу принципиально должен осуществляться строго последовательно, поэтому потоки не дадут никакого выигрыша. И, наконец, бывает так, что накладные расходы на синхронизацию доступа к глобальным ресурсам превышают экономию, достигаемую за счет использования нескольких потоков.
По этой и ряду других причин некоторые авторитеты вообще рекомендуют держаться подальше от многопоточного программирования. Действительно, такие программы сложны и подвержены ошибкам, которые трудно отлаживать. Но мы оставим читателю самому решать, когда стоит применять эту технику.
Проблемы, связанные с несинхронизированными потоками, хорошо известны. При одновременном доступе к глобальным данным со стороны нескольких потоков данные могут быть запорчены. Если один поток делает какое-то допущение о том, что успел выполнить другой поток, возможна гонка (race condition); обычно это приводит к «недетерминированному» коду, который дает разные результаты при каждом запуске. Наконец, существует опасность тупиковой ситуации, когда ни один поток не может продолжить выполнение, поскольку ожидает ресурс, занятый другим потоком. Код, написанный так, что ни одна из этих проблем не возникает, называется безопасным относительно потоков.
Не все в Ruby безопасно относительно потоков, но имеются методы синхронизации, которые позволяют контролировать доступ к переменным и ресурсам, защищать критические секции программы и избегать тупиковых ситуаций. Мы рассмотрим их в этой главе и проиллюстрируем на примерах.
13.1. Создание потоков и манипулирование ими
К числу основных операций над потоками относятся создание потока, передача ему входной информации и получение результатов, останов потока и т.д. Можно получить список запущенных потоков, опросить состояние потока и выполнить ряд других проверок.
Ниже представлен обзор основных операций.
13.1.1. Создание потоков
Создать поток просто: достаточно вызвать метод new и присоединить блок, который будет исполняться в потоке.
thread = Thread.new do
# Предложения, исполняемые в потоке...
end
Возвращаемое значение — объект типа Thread. Главный поток программы может использовать его для управления вновь созданным потоком.
А если нужно передать потоку параметры? Достаточно передать их методу Thread.new, который, в свою очередь, передаст их блоку.
a = 4
b = 5
с = 6
thread2 = Thread.new(а,b,с) do |a, x, y|
# Манипуляции с a, x и y.
end
# Если переменная а будет изменена новым потоком,
# то главный поток не получит об этом никакого уведомления.
Параметры блока, являющиеся ссылками на существующие переменные, практически неотличимы от самих переменных. Поэтому, например, переменная а в каком-то смысле «опасна», что и отражено в комментарии.
Поток может также обращаться к переменным из той же области видимости, в которой был создан. Ясно, что без синхронизации это может стать источником проблем. Главный и любой другой поток могут изменять такую переменную независимо друг от друга, и результаты подобных действий непредсказуемы.
x = 1
y = 2
thread3 = Thread.new do
# Этот поток может манипулировать переменными x and y
# из внешней области видимости, но это не всегда безопасно.
sleep(rand(0)) # Спать в течение случайно выбранного времени
# (меньше секунды).
x = 3
end
sleep(rand(0))
puts x
# Если запустить эту программу несколько раз подряд, то может быть
# напечатано как 1, так и 3!
У метода new есть синоним fork — это имя выбрано по аналогии с хорошо известным системным вызовом в UNIX.
13.1.2. Доступ к локальным переменным потока
Мы знаем об опасности доступа из потока к переменным, определенным вне его области видимости, но мы также знаем, что у потока могут быть локальные данные. А что делать, если поток хочет «обнародовать» часть принадлежащих ему данных?
Для этой цели предусмотрен специальный механизм. Если объект Thread рассматривать как хэш, то к локальным данным потока можно обратиться из любого места в области видимости этого объекта. Мы не хотим сказать, что так можно обратиться к настоящим локальным переменным; это допустимо лишь для доступа к именованным данным, своим для каждого потока.
Существует также метод key?, который сообщает, используется ли указанное имя в данном потоке.
Внутри потока к таким данным тоже следует обращаться, как к хэшу. Метод Thread.current позволяет сделать запись чуть менее громоздкой.
thread = Thread.new do
t = Thread.current
t[:var1] = "Это строка"
t[:var2] = 365
end
# Доступ к локальным данным потока извне...
x = thread[:var1] # "Это строка"
y = thread[:var2] # 365
has_var2 = thread.key?("var2") # true
has_var3 = thread.key?("var3") # false
Отметим, что эти данные доступны другим потокам даже после того, их владелец завершил работу (как в данном случае).
Помимо символа (см. выше), для идентификации локальной переменной потока можно употреблять и строки.
thread = Thread.new do
t = Thread.current
t["var3"] = 25
t[:var4] = "foobar"
end
a = thread[:var3] = 25
b = thread["var4"] = "foobar"
He путайте эти специальные имена с настоящими локальными переменными. В следующем фрагменте разница видна более отчетливо:
thread = Thread.new do
t = Thread.current
t["var3"] = 25
t[:var4] = "foobar"
var3 = 99 # Настоящие локальные переменные
var4 = "zorch" # (извне недоступны)
end
a = thread[:var3] # 25
b = thread["var4"] # "foobar"
И еще отметим, что ссылку на объект (на настоящую локальную переменную) внутри потока можно использовать для сокращенной записи. Это справедливо, если вы сохраняете одну и ту же ссылку, а не создаете новую.
thread = Thread.new do
t = Thread.current
x = "nXxeQPdMdxiBAxh"
t[:my_message] = x
x.reverse!
x.delete! "x"
x.gsub!(/[A-Z]/,"")
# С другой стороны, присваивание создает новый объект,
# поэтому "сокращение" становится бесполезным...
end
а = thread[:my_message] # "hidden"
Ясно, что сокращение не будет работать и в том случае, когда вы имеете дело с объектами наподобие Fixnum, которые хранятся как непосредственные значения, а не ссылки.
13.1.3. Опрос и изменение состояния потока
В классе Thread есть несколько полезных методов класса. Метод list возвращает массив «живых» потоков, метод main возвращает ссылку на главный поток программы, который породил все остальные, а метод current позволяет потоку идентифицировать самого себя.
t1 = Thread.new { sleep 100 }
t2 = Thread.new do
if Thread.current == Thread.main
puts "Это главный поток." # HE печатается,
end
1.upto(1000) { sleep 0.1 }
end
count = Thread.list.size # 3
if Thread.list.include ?(Thread.main)
puts "Главный поток жив." # Печатается всегда!
end
if Thread.current == Thread.main
puts "Я главный поток." # Здесь печатается...
end
Методы exit, pass, start, stop и kill служат для управления выполнением потоков (как изнутри, так и извне):
# в главном потоке...
Thread.kill(t1) # Завершить этот поток.
Thread.pass # Передать управление t2.
t3 = Thread.new do
sleep 20
Thread.exit # Выйти из потока.
puts "Так не бывает!" # Никогда не выполняется.
end
Thread.kill(t2) # Завершить t2.
# Выйти из главного потока (все остальные тоже завершаются).
Thread.exit
Отметим, что не существует метода экземпляра stop, поэтому поток может приостановить собственное выполнение, но не выполнение другого потока.
Существуют различные методы для опроса состояния потока. Метод экземпляра alive? сообщает, является ли данный поток «живым» (не завершил выполнение), а метод stop? — находится ли он в состоянии «приостановлен».
count = 0
t1 = Thread.new { loop { count += 1 } }
t2 = Thread.new { Thread.stop }
sleep 1
flags = [t1.alive?, # true
t1.stop?, # false
t2.alive?, # true
t2.stop?] # true
Получить состояние потока позволяет метод status. Он возвращает значение "run", если поток выполняется; "sleep" — если он приостановлен, спит или ожидает результата ввода/вывода; false — если поток нормально завершился, и nil — если поток завершился в результате исключения.
t1 = Thread.new { loop {} }
t2 = Thread.new { sleep 5 }
t3 = Thread.new { Thread.stop }
t4 = Thread.new { Thread.exit }
t5 = Thread.new { raise "exception" }
s1 = t1.status # "run"
s2 = t2.status # "sleep"
s3 = t3.status # "sleep"
s4 = t4.status # false
s5 = t5.status # nil
Глобальную переменную $SAFE можно установить по-разному в разных потоках. Стало быть, она вовсе не является глобальной, но стоит ли жаловаться на это, если она позволяет разным потокам работать с разным уровнем безопасности? Метод safe_level возвращает текущий уровень безопасности потока.
t1 = Thread.new { $SAFE = 1; sleep 5 }
t2 = Thread.new { $SAFE = 3; sleep 5 }
sleep 1
lev0 = Thread.main.safe_level # 0
lev1 = t1.safe_level # 1
lev2 = t2.safe_level # 3
Метод доступа priority позволяет узнать и изменить приоритет потока:
t1 = Thread.new { loop { sleep 1 } }
t2 = Thread.new { loop { sleep 1 } }
t2.priority = 3 # Установить для потока t2 приоритет 3
p1 = t1.priority # 0
p2 = t2.priority # 3
Поток с большим приоритетом будет чаще получать процессорное время. Специальный метод pass позволяет передать управление планировщику. Иными словами, поток просто уступает свой временной квант, но не приостанавливается и не засыпает.
t1 = Thread.new do
puts "alpha"
Thread.pass
puts "beta"
end
t2 = Thread.new do
puts "gamma"
puts "delta"
end
t1.join
t2.join
В этом искусственном примере вызов Thread.pass приводит к печати строк в следующем порядке: alpha gamma delta beta. Без него было бы напечатано alpha beta gamma delta. Конечно, этот механизм следует использовать не для синхронизации, а только для экономного расходования процессорного времени.
Выполнение приостановленного потока можно возобновить методами методами run или wakeup:
t1 = Thread.new do
Thread.stop
puts "Здесь есть изумруд."
end
t2 = Thread.new do
Thread.stop
puts "Вы находитесь в точке Y2."
end
sleep 1
t1.wakeup
t2.run
Между этими методами есть тонкое различие. Метод wakeup изменяет состояние потока, так что он становится готовым к выполнению, но не запускает его немедленно. Метод же run пробуждает поток и сразу же планирует его выполнение.
В данном случае t1 просыпается раньше t2, но t2 планируется первым, что приводит к следующему результату:
Вы находитесь в точке Y2.
Здесь есть изумруд.
Конечно, было бы неосмотрительно реализовывать синхронизацию на основе этого механизма.
Метод экземпляра raise возбуждает исключение в потоке, от имени которого вызван. (Этот метод необязательно вызывать в том потоке, которому адресовано исключение.)
factorial1000 = Thread.new do
begin
prod = 1
1.upto(1000) {|n| prod *= n }
puts "1000! = #{prod}"
rescue
# Ничего не делать...
end
end
sleep 0.01 # На вашей машине значение может быть иным.
if factorial1000.alive?
factorial1000.raise("Стоп!")
puts "Вычисление было прервано!"
else
puts "Вычисление успешно завершено."
end
Поток, запущенный в предыдущем примере, пытался вычислить факториал 1000. Если для этого не хватило одной сотой секунды, то главный поток завершит его. Как следствие, на относительно медленной машине будет напечатано сообщение «Вычисление было прервано!» Что касается части rescue внутри потока, то в ней мог бы находиться любой код, как, впрочем, и всегда.
13.1.4. Назначение рандеву (и получение возвращенного значения)
Иногда главный поток хочет дождаться завершения другого потока. Для этой цели предназначен метод join:
t1 = Thread.new { do_something_long() }
do_something_brief()
t1.join # Ждать завершения t1.
Отметим, что вызывать метод join необходимо, если нужно дождаться завершения другого потока. В противном случае главный поток завершится, а вместе с ним и все остальные. Например, следующий код никогда не напечатал бы окончательный ответ, не будь в конце вызова join:
meaning_of_life = Thread.new do
puts "Смысл жизни заключается в..."
sleep 10
puts 42
end
sleep 9
meaning_of_life.join
Существует полезная идиома, позволяющая вызвать метод join для всех «живых» потоков, кроме главного (ни один поток, даже главный, не может вызывать join для самого себя).
Thread.list.each { |t| t.join if t != Thread.main }
Конечно, любой поток, а не только главный, может вызвать join для любого другого потока. Если главный поток и какой-то другой попытаются вызвать join друг для друга, возникнет тупиковая ситуация. Интерпретатор обнаружит это и завершит программу.
thr = Thread.new { sleep 1; Thread.main.join }
thr.join # Тупиковая ситуация!
С потоком связан блок, который может возвращать значение. Следовательно, и сам поток может возвращать значение. Метод value неявно вызывает join и ждет, пока указанный поток завершится, а потом возвращает значение последнего вычисленного в потоке выражения.
max = 10000
thr = Thread.new do
sum = 0
1.upto(max) { |i| sum += i }
sum
end
guess = (max*(max+1))/2
print "Формула "
if guess == thr.value
puts "правильна."
else
puts "неправильна."
end
13.1.5. Обработка исключений
Что произойдет, если в потоке возникнет исключение? Как выясняется, поведение можно сконфигурировать заранее.
Существует флаг abort_on_exception, который работает как на уровне класса, так и на уровне экземпляра. Он реализован в виде метода доступа (то есть позволяет читать и устанавливать атрибут) на обоих уровнях. Если abort_on_exception для некоторого потока равен true, то при возникновении в этом потоке исключения будут завершены и все остальные потоки.
Thread.abort_on_exception = true
t1 = Thread.new do
puts "Привет!"
sleep 2
raise "some exception"
puts "Пока!"
end
t2 = Thread.new { sleep 100 }
sleep 2
puts "Конец"
В этом примере флаг abort_on_exception установлен в true на уровне системы в целом (отменяя подразумеваемое по умолчанию значение). Следовательно, когда в потоке t1 возникает исключение, завершаются и t1, и главный поток. Печатается только слово «Привет!».
В следующем примере эффект такой же:
t1 = Thread.new do
puts "Привет!"
sleep 2
raise "some exception"
puts "Пока!"
end
t1.abort_on_exception = true
t2 = Thread.new { sleep 100 }
sleep 2
puts "Конец"
А вот в следующем оставлено принимаемое по умолчанию значение false, и мы наконец-то видим слово «Конец», печатаемое главным потоком (слова «Пока!» мы не увидим никогда, поскольку поток t1 при возникновении исключения завершается безусловно).
t1 = Thread.new do
puts "Привет!"
sleep 2
raise "some exception"
puts "Пока!"
end
t2 = Thread.new { sleep 100 }
sleep 2
puts "Конец"
# Выводится:
Привет!
Конец
13.1.6. Группы потоков
Группа потоков — это механизм управления логически связанными потоками. По умолчанию все потоки принадлежат группе Default (это константа класса). Но если создать новую группу, то в нее можно будет помещать потоки.
В любой момент времени поток может принадлежать только одной группе. Если поток помещается в группу, то он автоматически удаляется из той группы, которой принадлежал ранее.
Метод класса ThreadGroup.new создает новую группу потоков, а метод экземпляра add помещает поток в группу.
f1 = Thread.new("file1") { |file| waitfor(file) }
f2 = Thread.new("file2") { |file| waitfor(file) }
file_threads = ThreadGroup.new
file_threads.add f1
file_threads.add f2
Метод экземпляра list возвращает массив всех потоков, принадлежащих данной группе.
# Подсчитать все "живые" потоки в группе this_group.
count = 0
this_group.list.each {|x| count += 1 if x.alive? }
if count < this_group.list.size
puts "Некоторые потоки в группе this_group уже скончались."
else
puts "Все потоки в группе this_group живы."
end
В класс ThreadGroup можно добавить немало полезных методов. В примере ниже показаны методы для возобновления всех потоков, принадлежащих группе, для группового ожидания потоков (с помощью join) и для группового завершения потоков:
class ThreadGroup
def wakeup
list.each { |t| t.wakeup }
end
def join
list.each { |t| t.join if t != Thread.current }
end
def kill
list.each { |t| t.kill }
end
end
13.2. Синхронизация потоков
Почему необходима синхронизация? Потому что из-за «чередования» операций доступ к переменным и другим сущностям может осуществляться в порядке, который не удается установить путем чтения исходного текста отдельных потоков. Два и более потоков, обращающихся к одной и той же переменной, могут взаимодействовать между собой непредвиденными способами, и отлаживать такую программу очень трудно.
Рассмотрим простой пример:
x = 0
t1 = Thread.new do
1.upto(1000) do
x = x + 1
end
end
t2 = Thread.new do
1.upto(1000) do
x = x + 1
end
end
t1.join
t2.join
puts x
Сначала переменная x равна 0. Каждый поток увеличивает ее значение на тысячу раз. Логика подсказывает, что в конце должно быть напечатано 2000.
Но фактический результат противоречит логике. На конкретной машине было напечатано значение 1044. В чем дело?
Мы предполагали, что инкремент целого числа — атомарная (неделимая) операция. Но это не так. Рассмотрим последовательность выполнения приведенной выше программы. Поместим поток t1 слева, а поток t2 справа. Каждый квант времени занимает одну строчку и предполагается, что к моменту, когда был сделан этот мгновенный снимок, переменная x имела значение 123.
t1 t2
-------------------------- -----------------------------
Прочитать значение x (123)
Прочитать значение x (123)
Увеличить значение на 1 (124)
Увеличить значение на 1 (124)
Записать результат в x
Записать результат в x
Ясно, что каждый поток увеличивает на 1 то значение, которое видит. Но не менее ясно и то, что после увеличения на 1 обоими потоками x оказалось равно всего 124.
И это лишь самая простая из проблем, возникающих в связи с синхронизацией. Для решения более сложных приходится прилагать серьезные усилия — это предмет изучения специалистами в области теоретической информатики и математики.
13.2.1. Синхронизация с помощью критических секций
Простейший способ синхронизации дают критические секции. Когда поток входит в критическую секцию программы, гарантируется, что никакой другой поток не войдет в нее, пока первый не выйдет.
Если акцессору Thread.critical присвоить значение true, то выполнение других потоков не будет планироваться. В следующем примере мы переработали код предыдущего, воспользовавшись акцессором critical для определения критической области, которая защищает уязвимые участки программы.
x = 0
t1 = Thread.new do
1.upto(1000) do
Thread.critical = true
x = x + 1
Thread.critical = false
end
end
t2 = Thread.new do
1.upto(1000) do
Thread.critical = true
x = x + 1
Thread.critical = false
end
end
t1.join
t2.join
puts x
Теперь последовательность выполнения изменилась; взгляните, в каком порядке работают потоки t1 и t2. (Конечно, вне того участка, где происходит увеличение переменной, потоки могут чередоваться более-менее случайным образом.)
t1 t2
----------------------------- -----------------------------
Прочитать значение x (123)
Увеличить значение на 1 (124)
Записать результат в x
Прочитать значение x (124)
Увеличить значение на 1 (125)
Записать результат в x
Возможны такие комбинации операций с потоками, при которых поток планируется даже тогда, когда какой-то другой поток находится в критической секции.
Простейший случай — вновь созданный поток начинает исполнение немедленно вне зависимости от того, занимает какой-то другой поток критическую секцию или нет. Поэтому описанную технику лучше применять только в самых простых ситуациях.
13.2.2. Синхронизация доступа к ресурсам (mutex.rb)
В качестве примера рассмотрим задачу индексирования Web-сайтов. Мы извлекаем слова из многочисленных страниц в Сети и сохраняем их в хэше. Ключом является само слово, а значением — строка, идентифицирующая документ и номер строки в этом документе.
Постановка задачи и так достаточно груба. Но мы огрубим ее еще больше, введя следующие упрощающие допущения:
• будем представлять удаленные документы в виде строк;
• ограничимся всего тремя строками (они будут «зашиты» в код);
• сетевые задержки будем моделировать «засыпанием» на случайный промежуток времени.
Взгляните на программу в листинге 13.1. Она даже не печатает получаемые данные целиком, а выводит лишь счетчик слов (не уникальный). Каждый раз при чтении или обновлении хэша мы вызываем метод hesitate, который приостанавливает поток на случайное время. Тем самым поведение программы становится недетерминированным и приближенным к реальности.
Листинг 13.1. Программа индексирования с ошибками (гонка)
@list = []
@list[0]="shoes ships\nsealing-wax"
@list[1]="cabbages kings"
@list[2]="quarks\nships\ncabbages"
def hesitate
sleep rand(0)
end
@hash = {}
def process_list(listnum)
lnum = 0
@list[listnum].each do |line|
words = line.chomp.split
words.each do |w|
hesitate
if @hash[w]
hesitate
@hash[w] += ["#{listnum}:#{lnum}"]
else
hesitate
@hash[w] = ["#{listnum}:#{lnum}"]
end
end
lnum += 1
end
end
t1 = Thread.new(0) {|num| process_list(num) }
t2 = Thread.new(1) {|num| process_list(num) }
t3 = Thread.new(2) {|num| process_list(num) }
t1.join
t2.join
t3.join
count = 0
@hash.values.each {|v| count += v.size }
puts "Всего слов: #{count} " # Может быть напечатано 7 или 8!
Здесь имеется проблема. Если ваша система ведет себя примерно так же, как наша, то программа может напечатать одно из двух значений! В наших тестах с одинаковой вероятностью печаталось 7 или 8. Если слов и списков больше, то и разброс окажется более широким.
Попробуем исправить положение с помощью мьютекса, который будет контролировать доступ к разделяемому ресурсу. (Слово «mutex» — это сокращение от mutual exclusion, «взаимная блокировка».)
Обратимся к листингу 13.2. Библиотека Mutex позволяет создавать мьютексы и манипулировать ими. Мы можем захватить (lock) мьютекс перед доступом к хэшу и освободить (unlock) его по завершении операции.
Листинг 13.2. Программа индексирования с мьютексом
require 'thread.rb'
@list = []
@list[0]="shoes ships\nsealing-wax"
@list[1]="cabbages kings"
@list[2]="quarks\nships\ncabbages"
def hesitate
sleep rand(0)
end
@hash = {}
@mutex = Mutex.new
def process_list(listnum)
lnum = 0
@list[listnum].each do |line|
words = line.chomp.split
words.each do |w|
hesitate
@mutex.lock
if @hash[w]
hesitate
@hash[w] += ["#{listnum}:#{lnum}"]
else
hesitate
@hash[w] = ["#{listnum}:#{lnum}"]
end
@mutex.unlock
end
lnum += 1
end
end
t1 = Thread.new(0) {|num| process_list(num) }
t2 = Thread.new(1) {|num| process_list(num) }
t3 = Thread.new(2) {|num| process_list(num) }
t1.join
t2.join
t3.join
count = 0
@hash.values.each {|v| count += v.size }
puts "Всего слов: #{count} " # Всегда печатается 8!
Отметим, что помимо метода lock в классе Mutex есть также метод try_lock. Он отличается от lock тем, что если мьютекс уже захвачен другим потоком, то он не дожидается освобождения, а сразу возвращает false.
require 'thread'
mutex = Mutex.new
t1 = Thread.new { mutex.lock; sleep 30 }
sleep 1
t2 = Thread.new do
if mutex.try_lock
puts "Захватил"
else
puts "He сумел захватить" # Печатается немедленно.
end
end
sleep 2
Эта возможность полезна, если поток не хочет приостанавливать выполнение. Есть также метод synchronize, который захватывает мьютекс, а потом автоматически освобождает его.
mutex = Mutex.new
mutex.synchronize do
# Любой код, нуждающийся в защите...
end
Существует еще библиотека mutex_m, где определен модуль Mutex_m, который можно подмешивать к классу (или использовать для расширения объекта). У такого расширенного объекта будут все методы мьютекса, так что он сам может выступать в роли мьютекса.
require 'mutex_m'
class MyClass
include Mutex_m
# Теперь любой объект класса MyClass может вызывать
# методы lock, unlock, synchronize...
# Внешние объекты также могут вызывать эти
# методы для объекта MyClass.
end
13.2.3. Предопределенные классы синхронизированных очередей
В библиотеке thread.rb есть пара классов, которые иногда бывают полезны. Класс Queue реализует безопасную относительно потоков очередь, доступ к обоим концам которой синхронизирован. Это означает, что разные потоки могут, ничего не опасаясь, работать с такой очередью. Класс SizedQueue отличается от предыдущего тем, что позволяет ограничить размер очереди (число элементов в ней).
Оба класса имеют практически один и тот же набор методов, поскольку SizedQueue наследует Queue. Правда, в подклассе определен еще акцессор max, позволяющий получить и установить максимальный размер очереди.
buff = SizedQueue.new(25)
upper1 = buff.max #25
# Увеличить размер очереди...
buff.max = 50
upper2 = buff.max # 50
В листинге 13.3 приведено решение задачи о производителе и потребителе. Для производителя задержка (аргумент sleep) чуть больше, чем для потребителя, чтобы единицы продукции «накапливались».
Листинг 13.3. Задача о производителе и потребителе
require 'thread'
buffer = SizedQueue.new(2)
producer = Thread.new do
item = 0
loop do
sleep rand 0
puts "Производитель произвел #{item}"
buffer.enq item
item += 1
end
end
consumer = Thread.new do
loop do
sleep (rand 0)+0.9
item = buffer.deq
puts "Потребитель потребил #{item}"
puts " ожидает = #{buffer.num_waiting}"
end
end
sleep 60 # Работать одну минуту, потом завершить оба потока.
Чтобы поместить элемент в очередь и извлечь из нее, рекомендуется применять соответственно методы enq и deq. Можно было бы для помещения в очередь пользоваться также методом push, а для извлечения — методами pop и shift, но их названия не так мнемоничны в применении к очередям.
Метод empty? проверяет, пуста ли очередь, а метод clear опустошает ее. Метод size (и его синоним length) возвращает число элементов в очереди.
# Предполагается, что другие потоки не мешают...
buff = Queue.new
buff.enq "one"
buff.enq "two"
buff.enq "three"
n1 = buff.size # 3
flag1 = buff.empty? # false
buff.clear
n2 = buff.size # 0
flag2 = buff.empty? # true
Метод num_waiting возвращает число потоков, ожидающих доступа к очереди. Если размер очереди не ограничен, то это потоки, ожидающие возможности удалить элементы; для ограниченной очереди включаются также потоки, пытающиеся добавить элементы.
Необязательный параметр non_block метода deq в классе Queue по умолчанию равен false. Если же он равен true, по при попытке извлечь элемент из пустой очереди он не блокирует поток, а возбуждает исключение ThreadError.
13.2.4. Условные переменные
Условная переменная — это, по существу, очередь потоков. Они используются в сочетании с мьютексами для лучшего управления синхронизацией потоков.
Условная переменная всегда ассоциируется с каким-то мьютексом. Ее назначение — освободить мьютекс до тех пор, пока не начнет выполняться определенное условие. Представьте себе ситуацию, когда поток захватил мьютекс, но не готов продолжать выполнение. Тогда он может заснуть под контролем условной переменной, ожидая, что будет разбужен, когда условие станет истинным.
Важно понимать, что пока поток ждет условную переменную, мьютекс свободен, поэтому другие потоки могут получить доступ к защищенному им ресурсу. А как только другой поток сигнализирует этой переменной, ожидающий поток пробуждается и пытается вновь захватить мьютекс.
Рассмотрим несколько искусственный пример в духе задачи об обедающих философах. Представьте себе, что вокруг стола сидят три скрипача, ожидающих своей очереди поиграть. Но у них есть всего две скрипки и один смычок. Понятно, что скрипач сможет играть, только если одновременно завладеет одной из скрипок и смычком.
Мы поддерживаем счетчики свободных скрипок и смычков. Когда скрипач хочет получить скрипку и смычок, он должен ждать их освобождения. В программе ниже мы защитили проверку условия мьютексом и под его защитой ждем скрипку и смычок порознь. Если скрипка или смычок заняты, поток засыпает. Он не владеет мьютексом до тех пор, пока другой поток не просигнализирует о том, что ресурс свободен. В этот момент первый поток просыпается и снова захватывает мьютекс.
Код представлен в листинге 13.4.
Листинг 13.4. Три скрипача
require 'thread'
@music = Mutex.new
@violin = ConditionVariable.new
@bow = ConditionVariable.new
@violins_free = 2
@bows_free = 1
def musician(n)
loop do
sleep rand(0)
@music.synchronize do
@violin.wait(@music) while @violins_frее == 0
@violins_free -= 1
puts "#{n} владеет скрипкой"
puts "скрипок #@violins_frее, смычков #@bows_free"
@bow.wait(@music) while @bows_free == 0
@bows_free -= 1
puts "#{n} владеет смычком"
puts "скрипок #@violins_free, смычков #@bows_free"
end
sleep rand(0)
puts "#{n}: (...играет...)"
sleep rand(0)
puts "#{n}: Я закончил."
@music.synchronize do
@violins_free += 1
@violin.signal if @violins_free == 1
@bows_free += 1
@bow.signal if @bows_free == 1
end
end
end
threads = []
3.times {|i| threads << Thread.new { musician(i) } }
threads.each {|t| t.join }
Мы полагаем, что это решение никогда не приводит к тупиковой ситуации, хотя доказать этого не сумели. Но интересно отметить, что описанный алгоритм не справедливый. В наших тестах оказалось, что первый скрипач играет чаще двух остальных, а второй чаще третьего. Выяснение причин такого поведения и его исправление мы оставляем читателю в качестве интересного упражнения.
13.2.5. Другие способы синхронизации
Еще один механизм синхронизации - это монитор, который в Ruby реализован в библиотеке monitor.rb. Это более развитый по сравнению с мьютексом механизм, основное отличие состоит в том, что захваты одного и того же мьютекса не могут быть вложенными, а монитора — могут.
Тривиальный случай возникновения такой ситуации вряд ли возможен. В самом деле, кто станет писать такой код:
@mutex = Mutex.new
@mutex.synchronize do
@mutex.synchronize do
#...
end
end
Но нечто подобное может произойти в сложной программе (или при рекурсивном вызове метода). Какова бы ни была причина, последствием будет тупиковая ситуация. Уход от нее — одно из достоинств модуля-примеси Monitor.
@mutex = Mutex.new
def some_method
@mutex.synchronize do
#...
some_other_method # Тупиковая ситуация!
end
end
def some_other_method
@mutex.synchronize do
#...
end
end
Модуль-примесь Monitor обычно применяется для расширения объекта. Для создания условной переменной предназначен метод new_cond.
Класс ConditionVariable в библиотеке monitor.rb дополнен по сравнению с определением в библиотеке thread. У него есть методы wait_until и wait_while, которые блокируют поток в ожидании выполнения условия. Кроме того, возможен тайм-аут при ожидании, поскольку у метода wait имеется параметр timeout, равный количеству секунд (по умолчанию nil).
Поскольку примеры работы с потоками у нас кончаются, то в листинге 13.5 мы предлагаем реализацию классов Queue и SizedQueue с помощью монитора. Код приводится с разрешения автора, Шуго Маэда (Shugo Maeda).
Листинг 13.5. Реализация класса Queue с помощью монитора
# Автор: Shugo Maeda
require 'monitor'
class Queue
def initialize
@que = []
@monitor = Monitor.new
@empty_cond = @monitor.new_cond
end
def enq(obj)
@monitor.synchronize do
@que.push(obj)
@empty_cond.signal
end
end
def deq
@monitor.synchronize do
while @que.empty?
@empty_cond.wait
end
return @que.shift
end
end
end
class SizedQueue < Queue
attr :max
def initialize(max)
super()
@max = max
@full_cond = @monitor.new_cond
end
def enq(obj)
@monitor.synchronize do
while @que.length >= @max
@full_cond.wait
end
super(obj)
end
end
def deq
@monitor.synchronize do
obj = super
if @que.length < @max
@full_cond.signal
end
return obj
end
end
def max=(max)
@monitor.synchronize do
@max = max
@full_cond.broadcast
end
end
end
Еще один вариант синхронизации (двузначную блокировку со счетчиком) предлагает библиотека sync.rb. В ней определен модуль Sync_m, который можно применять вместе с ключевыми словами include и extend (как и Mutex_m). Этот модуль содержит методы locked?, shared?, exclusive?, lock, unlock и try_lock.
13.2.6. Тайм-аут при выполнении операций
Часто встречается ситуация, когда на выполнение операции отводится определенное максимальное время. Это позволяет избежать бесконечных циклов и более строго контролировать порядок работы. Подобная возможность очень полезна, в частности, в сетевых приложениях, где ответ от сервера может и не прийти.
Библиотека timeout.rb предлагает решение этой проблемы на основе потоков (см. листинг 13.6). С методом timeout ассоциирован выполняемый блок. Если истечет заданное число секунд, метод возбуждает исключение TimeoutError, которое можно перехватить с помощью rescue.
Листинг 13.6. Пример тайм-аута
require 'timeout.rb'
flag = false
answer = nil
begin
timeout(5) do
puts "Хочу печенье!"
answer = gets.chomp
flag = true
end
rescue TimeoutError
flag = false
end
if flag
if answer == "cookie"
puts "Спасибо! Хрум, хрум..."
else
puts "Это же не печенье!"
exit
end
else
puts "Эй, слишком медленно!"
exit
end
puts "До встречи..."
13.2.7. Ожидание события
Часто один или несколько потоков следят за «внешним миром», а остальные выполняют полезную работу. Все примеры в этом разделе надуманные, но общий принцип они все же иллюстрируют.
В следующем примере прикладную задачу решают три потока. Четвертый поток каждые пять секунд просыпается, проверяет глобальную переменную $flag и, когда видит, что флаг поднят, пробуждает еще два потока. Это освобождает три рабочих потока от необходимости напрямую общаться с двумя другими и, возможно, от многочисленных попыток разбудить их.
$flag = false
work1 = Thread.new { job1() }
work2 = Thread.new { job2() }
work3 = Thread.new { job3() }
thread4 = Thread.new { Thread.stop; job4() }
thread5 = Thread.new { Thread.stop; job5() }
watcher = Thread.new do
loop do
sleep 5
if $flag
thread4.wakeup
thread5.wakeup
Thread.exit
end
end
end
Если в какой-то момент выполнения метода job, переменная $flag станет равной true, то в течение пяти секунд после этого потоки thread4 и thread5 гарантированно запустятся. После этого поток watcher завершается.
В следующем примере мы ждем создания файла. Каждые 30 секунд проверяется его существование, и как только файл появится, мы запускаем новый поток. Тем временем остальные потоки занимаются своим делом. На самом деле ниже мы наблюдаем за тремя разными файлами.
def waitfor(filename)
loop do
if File.exist? filename
file_processor = Thread.new { process_file(filename) }
Thread.exit
else
sleep 30
end
end
end
waiter1 = Thread.new { waitfor("Godot") }
sleep 10
waiter2 = Thread.new { waitfor("Guffman") }
sleep 10
headwaiter = Thread.new { waitfor("head") }
# Основной поток занимается другими делами...
Есть много ситуаций, когда поток должен ожидать внешнего события (например, в сетевых приложениях так бывает, когда сервер на другом конце соединения работает медленно или ненадежно).
13.2.8. Продолжение обработки во время ввода/вывода
Часто приложению приходится выполнять одну или более длительных операций ввода/вывода. Прежде всего, речь идет о вводе данных с клавиатуры, поскольку человек печатает куда медленнее, чем вращается диск. Это время можно употребить на пользу с помощью потоков.
Возьмем, к примеру, шахматную программу, которая должна ждать, пока человек сделает ход. Конечно, мы можем изложить только сам принцип, не вдаваясь в технические детали.
Предположим, что итератор predict_move генерирует вероятные ходы человека (и ответные ходы программы). Тогда в момент, когда человек сделает ход, не исключено, что у компьютера уже будет готов ответ.
scenario = {} # Хэш ход-ответ.
humans_turn = true
thinking_ahead = Thread.new(board) do
predict_move do |m|
scenario[m] = my_response(board,m)
Thread.exit if humans_turn == false
end
end
human_move = get_human_move(board)
humans_turn = false # Остановить поток.
# Теперь можно посмотреть, нет ли в хэше scenario хода,
# сделанного пользователем...
Конечно, настоящие шахматные программы работают не так.
13.2.9. Реализация параллельных итераторов
Предположим, что нужно параллельно обходить несколько объектов, то есть для каждого объекта найти первый элемент, потом второй, потом третий и т.д.
Рассмотрим следующий пример. Пусть compose — имя магического метода, который выполняет композицию итераторов. Допустим еще, что у каждого объекта есть стандартный итератор each и что каждый объект возвращает по одному элементу на каждой итерации.
arr1 = [1, 2, 3, 4]
arr2 = [5, 10, 15, 20]
compose(arr1, arr2) {|a,b| puts "#{а} и #{b}" }
# Должно быть напечатано:
# 1 и 5
# 2 и 10
# 3 и 15
# 4 и 20
Можно было бы, конечно, использовать для этой цели zip. Но если нужно более элегантное решение, при котором все элементы не будут храниться одновременно, то без потоков не обойтись. Такое решение представлено в листинге 13.7.
Листинг 13.7. Параллельные итераторы
def compose(*objects)
threads = []
for obj in objects do
threads << Thread.new(obj) do |myobj|
me = Thread.current
me[:queue] = []
myobj.each {|x| me[:queue].push(x) }
end
end
list = [0] # Фиктивное значение, отличное от nil.
while list.nitems > 0 do # Еще есть не nil.
list = []
for thr in threads
list << thr[:queue].shift # Удалить по одному из каждого.
end
yield list if list.nitems > 0 # He вызывать yield, если все равны nil.
end
end
x = [1, 2, 3, 4, 5, 6, 7, 8]
y = " первый\n второй\n третий\n четвертый\n пятый\n"
z = %w[a b с d e f]
compose(x, у, z) {|a,b,c| p [a, b, c] }
# Выводится:
# [1, " первый\n", "a"]
# [2, " второй\n", "b"]
# [3, " третий\n", "c"]
# [4, " четвертый\n", "d"]
# [5, " пятый\n", "e"]
# [6, nil, "f"]
# [7, nil, nil]
# [8, nil, nil]
Обратите внимание: мы не предполагаем, что все объекты имеют одно и то же число элементов. Если один итератор доходит до конца раньше остальных, то он будет генерировать значения nil до тех пор, пока не закончит работу «самый длинный» итератор.
Конечно, можно написать и более общий метод, который на каждой итерации будет обрабатывать более одного элемента. (В конце концов, не все итераторы возвращают по одному значению за раз.) Можно было бы в первом параметре передавать число значений для каждого итератора.
Можно также пользоваться произвольными итераторами (а не только стандартным each). Их имена можно было бы передавать в виде строк, а вызывать с помощью метода send. Много чего еще можно придумать.
Впрочем, мы полагаем, что приведенного кода достаточно для большинства целей. Вариации на эту тему оставляем читателю в качестве упражнения.
13.2.10. Параллельное рекурсивное удаление
Забавы ради напишем код, который будет удалять дерево каталогов. Процедура рекурсивного удаления использует потоки. Как только обнаруживается очередной подкаталог, мы запускаем новый поток, который будет обходить его и удалять содержимое.
Созданные в ходе работы программы потоки хранятся в массиве threads. Поскольку это локальная переменная, у каждого потока будет собственная копия массива. Раз к ней может обращаться всего один поток, синхронизировать доступ не надо.
Отметим также, что в блок потока передается полное имя файла fullname, чтобы не нужно было беспокоиться по поводу того, что поток обращается к переменной, которую кто-то еще изменяет. Поток делает для себя локальную копию fn этой переменной.
Прежде чем удалять очередной каталог, мы должны дождаться завершения всех созданных в процессе его обхода потоков.
def delete_all(dir)
threads = []
Dir.foreach(dir) do |e|
next if [".",".."].include? e # Пропустить . и ..
fullname = dir + "/" + e
if FileTest.directory?(fullname)
threads << Thread.new(fullname) {|fn| delete_all(fn) }
else
File.delete(fullname)
end
end
threads.each { |t| t.join }
Dir.delete(dir)
end
delete_all("/tmp/stuff")
Будет ли работать такая программа быстрее, чем ее вариант без потоков? В наших тестах получалось по-разному. Возможно, это зависит от операционной системы и структуры конкретного каталога — глубины, количества файлов и т.д.
13.3. Заключение
Как было сказано, в Ruby не используются платформенные потоки. Программа не станет работать быстрее при наличии нескольких процессоров, но некоторого распараллеливания работы достичь все же можно. Потоки полезны во многих случаях» но писать и отлаживать многопоточную программу довольно трудно, особенно если для получения правильного результата приходится применять изощренные способы синхронизации.
Для синхронизации Ruby предоставляет такие классы, как Mutex, Monitor и ConditionVariable. Имеются также безопасные относительно потоков классы очередей Queue и SizedQueue.
В главе 14 мы перейдем от обсуждения техники программирования к решению конкретных задач, а именно сценариев системного администрирования.