В настоящее время имеется немало технологий, поддерживающих распределенную обработку: различные варианты RPC, а также COM, CORBA, DCE и Java RMI.
Одни проще, другие сложнее, но в принципе все делают одно и то же - предоставляют относительно прозрачный способ связи между находящимися в сети объектами так, чтобы с удаленными объектами можно было работать, как с локальными.
Зачем это вообще может понадобиться? Причин много. Например, чтобы распределить некоторую вычислительную задачу между многими процессорами. Примером может послужить программа SETI@home, которая использует ваш ПК для обработки небольших объемов данных в поисках внеземного разума (кстати, эта программа не является проектом института SETI). Другой пример — привлечение широких масс к взлому шифра RSA129 (эта попытка увенчалась успехом несколько лет назад). Существует очень много задач, которые можно разбить на небольшие части, пригодные для распределенного решения.
Можно также представить себе, что вы хотите предоставить интерфейс к некоему сервису, не раскрывая исходных текстов. Часто это делается с помощью Web-приложений, но из-за отсутствия состояния в протоколе HTTP это не всегда удобно (есть и другие недостатки). Механизм распределенного программирования позволяет решать подобные задачи более естественно.
В мире Ruby ответом на этот вызов стала программа drb, написанная Масатоси Секи (Masatoshi Seki); еще ее название записывают так: DRb. Существуют и другие способы распределенной обработки на Ruby, но drb, пожалуй, самый легкий. Здесь нет сложных служб разрешения имен, как в CORBA. Это всего лишь простая и удобная библиотека, предоставляющая всю необходимую функциональность. В данной главе мы рассмотрим основы работы как с ней самой, так и с надстроенной над ней системой Rinda.
20.1. Обзор: библиотека drb
Библиотека drb состоит из двух основных частей: серверной и клиентской. Грубую границу между ними можно провести следующим образом:
Сервер:
• запускает TCPServer и начинает прослушивать порт;
• привязывает объект к экземпляру сервера drb;
• принимает запросы на соединение от клиентов и отвечает на их сообщения;
• дополнительно может предоставлять контроль доступа (безопасность).
Клиент:
• устанавливает соединение с сервером;
• привязывает локальный объект к удаленному экземпляру сервера;
• посылает сообщения серверу и получает ответы.
Метод класса start_service отвечает за запуск TCP-сервера, прослушивающего указанный порт. Он принимает два параметра: URI (универсальный идентификатор ресурса), задающий порт (если он равен nil, то порт выбирается динамически), и объект, к которому мы хотим привязаться. Этот объект будет доступен удаленному клиенту, который сможет вызывать его методы, как если бы объект был локальным.
require "drb"
myobj = MyServer.new
DRb.start_service("druby://:1234", myobj) # Порт 1234.
# ...
Если порт выбирается динамически, то для получения полного URI, включающего и номер порта, можно воспользоваться методом класса uri.
DRb.start_service(nil, myobj)
myURI = DRb.uri # "druby://hal9000:2001"
Поскольку drb —многопоточная программа, любое серверное приложение должно выполнять join в потоке сервера (чтобы не дать приложению завершиться преждевременно и тем самым уничтожить выполняющийся поток).
# Предотвратить преждевременный выход.
DRb.thread.join
На стороне клиента мы вызываем метод start_service без параметров и с помощью класса DRbObject создаем локальный объект, соответствующий удаленному. Обычно первым параметром методу DRbObject.new передается nil.
require "drb"
DRb.start_service
obj = DRbObject.new(nil, "druby://hal9000:2001")
# Сообщения, передаваемые obj, перенаправляются
# удаленному объекту на стороне сервера...
Следует подчеркнуть, что на стороне сервера привязка осуществляется к единственному объекту, который должен отвечать на все получаемые запросы. Если клиентов несколько, то объект должен быть безопасным относительно потоков, чтобы не оказаться в некорректном состоянии. (Для совсем простых или узкоспециализированных приложений это может быть и необязательно.)
Мы не можем вдаваться в технические детали. Но имейте в виду, что если клиент читает или изменяет внутреннее состояние удаленного объекта, то при наличии нескольких клиентов возможна интерференция. Во избежание таких неприятностей мы рекомендуем применять механизмы синхронизации, например класс Mutex. (Подробнее о потоках и синхронизации рассказывается в главе 13.)
Скажем хотя бы несколько слов о безопасности. Ведь не всегда желательно, чтобы с вашим сервером мог соединяться кто угодно. Помешать им пытаться вы не можете, зато можете сделать такие попытки безуспешными.
В программе drb есть понятие списка контроля доступа (ACL). Это не что иное, как списки клиентов (или категорий клиентов), которым явно разрешен (или запрещен) доступ.
Приведем пример. Для создания нового списка ACL мы воспользуемся классом ACL, которому передадим один или два параметра.
Второй (необязательный) параметр метода ACL.new служит для ответа на вопрос: «Мы запрещаем доступ всем клиентам, кроме некоторых, или, наоборот, разрешаем доступ всем клиентам, кроме некоторых?» По умолчанию принимается первый вариант, который обозначается константой DENY_ALLOW равной 0. Второй режим обозначается ALLOW_DENY равной 1.
Первый параметр ACL.new представляет собой обычный массив строк, которые идут парами. Первая строка в паре должна быть равна "deny" или "allow", вторая описывает одного клиента или группу клиентов (по имени или по адресу):
require "drb/acl"
acl = ACL.new( %w[ deny all
allow 192.168.0.*
allow 210.251.121.214
allow localhost] )
Первая пара в этом примере, строго говоря, излишня, но проясняет смысл всей конструкции.
А как используются ACL? Метод install_acl приводит ACL в действие. Его необходимо вызывать перед обращением к start_service, иначе он не возымеет эффекта.
# Продолжение примера...
DRb.install_acl(acl)
DRb.start_service(nil, some_object)
# ...
Теперь, после запуска сервиса любой неавторизованный запрос на соединение приведет к исключению RuntimeError.
Это, конечно, не все, что можно сказать о библиотеке drb. Но для обзора вполне достаточно. В следующем разделе мы рассмотрим простой drb-сервер и drb-клиент, близкие к реальным программам. А затем поговорим о программах Rinda и Ring.
20.2. Пример: эмуляция биржевой ленты
В этом примере сервер публикует в сети биржевые котировки акций. К серверу может обратиться любой клиент, желающий узнать, сколько сейчас стоит его пакет.
Но мы добавили одну тонкость. Не желая следить за малейшими колебаниями цен, мы реализовали модуль Observer, который позволяет подписаться на информационный канал. Клиент следит за поступающими сведениями и предупреждает нас, когда изменение цены превысит заданный порог.
Сначала рассмотрим модуль DrbObservable. Это прямолинейная реализация паттерна Observer (Наблюдатель), описанного в замечательной книге Э. Гаммы, Р. Хелма, Р. Джонсона и Дж. Влиссидеса «Паттерны проектирования» (см. сноску в разделе 12.3.1). Еще этот паттерн называют «Издатель-Подписчик».
В листинге 20.1 наблюдатель определен как объект, отвечающий на вызов метода update. Сервер добавляет наблюдателей по их просьбе и посылает им уведомления, обращаясь к методу notify_observers.
Листинг 20.1. Модуль DrbObservable
module DRbObservable
def add_observer(observer)
@observer_peers ||= []
unless observer.respond_to? :update
raise NameError, "наблюдатель должен отвечать на вызов 'update'"
end
@observer_peers.push observer
end
def delete_observer(observer)
@observer_peers.delete observer if defined? @observer_peers
end
def notify_observers(*arg)
return unless defined? @observer_peers
for i in @observer_peers.dup
begin
i.update(*arg)
rescue
delete_observer(i)
end
end
end
end
Сервер (он же канал) в листинге 20.2 эмулирует биржевые котировки с помощью последовательности псевдослучайных чисел (простите мою иронию, но это очень точно соответствует характеру рынка). Символ, идентифицирующий компанию, — всего лишь косметическое украшение, никакого реального смысла в этой программе он не имеет. При каждом изменении цены посылается уведомление всем наблюдателям.
Листинг 20.2. Канал биржевых котировок (drb-сервер)
require "drb"
require "drb_pbserver"
# Генерировать случайные котировки.
class MockPrice
MIN = 75
RANGE = 50
def initialize(symbol)
@price = RANGE / 2
end
def price
@price += (rand() - 0.5)*RANGE
if @price < 0
@price = -@price
elsif @price >= RANGE
@price = 2*RANGE - @price
end
MIN + @price
end
end
class Ticker # Периодически получать котировку акций.
include DRbObservable
def initialize(price_feed)
@feed = price_feed
Thread.new { run }
end
def run
lastPrice = nil
loop do
price = @feed.price
print "Текущая котировка: #{price}\n"
if price != lastPrice
lastPrice = price
notify_observers(Time.now, price)
end
sleep 1
end
end
end
ticker = Ticker.new(MockPrice.new("MSFT"))
DRb.start_service('druby://localhost:9001', ticker)
puts 'Нажмите [return] для завершения.'
gets
На платформе Windows примененный способ завершения программы вызывает сложности. Функция gets в этом случае может блокировать главный поток. Если вы это видите, попробуйте вместо обращения к gets поставить DRb.thread.join (а завершайте программу нажатием Ctrl+C).
Неудивительно, что клиент (листинг 20.3) начинает с установления соединения с сервером. Он получает ссылку на объект показа котировок и устанавливает верхний и нижний пороги изменения цены. Затем клиент выводит сообщение пользователю всякий раз, как цена выходит за пределы указанного диапазона.
Листинг 20.3. Наблюдатель биржевых котировок (drb-клиент)
require "drb"
class Warner
include DRbUndumped
def initialize(ticker, limit)
@limit = limit
ticker.add_observer(self) # Любой объект Warner
# является наблюдателем.
end
end
class WarnLow < Warner
def update(time, price) # Обратный вызов наблюдателя.
if price < @limit
print "--- #{time.to_s}: Цена ниже #@limit: #{price}\n"
end
end
end
class WarnHigh < Warner
def update(time, price) # Обратный вызов наблюдателя.
if price > @limit
print "+++ #{time.to_s}: Цена выше #@limit: #{price}\n"
end
end
end
DRb.start_service
ticker = DRbObject.new(nil, "druby://localhost:9001")
WarnLow.new(ticker, 90)
WarnHigh.new(ticker, 110)
puts 'Нажмите [return] для завершения.'
gets
Модуль DRbUndumped (см. листинге 20.3) следует включать в любой объект, который не нужно подвергать маршалингу. Самого присутствия этого модуля в числе предков объекта достаточно, чтобы drb не пытался применять к нему маршалинг. Вот исходный текст этого модуля целиком:
module DrbUndumped
def _dump(dummy)
raise TypeError, "can't dump"
end
end
Приложение из этого раздела достаточно содержательно, и в то же время в нем легко разобраться. Есть и другие подходы к решению подобных задач. Но способ, показанный нами, демонстрирует простоту и элегантность распределенного Ruby.
20.3. Rinda: пространство кортежей в Ruby
Термин «пространство кортежей» появился в 1985 году, а сама идея еще старше. Кортежем называется массив или вектор, состоящий из элементов данных (как строка в таблице базы данных). Пространство кортежей — это большое объектное пространство, наполненное кортежами, нечто вроде «информационного супа».
Пока реализация пространства кортежей кажется ничем не примечательной. Но все становится гораздо интереснее, стоит лишь осознать, что к нему могут обращаться многие клиенты и доступ должен синхронизироваться. Короче говоря, это распределенная сущность; любой клиент может читать из пространства кортежей или писать в него, то есть его можно рассматривать как большое распределенное хранилище или даже способ коммуникации.
Первой реализацией пространства кортежей был проект Linda — исследование в области параллельного программирования, выполненное в Йельском университете в 1980-х годах. Реализация на языке Ruby (конечно, на основе библиотеки drb), естественно, называется Rinda.
Кортеж в Rinda может быть массивом или хэшем. На хэш налагается дополнительное ограничение: все ключи должны быть строками. Вот несколько примеров простых кортежей:
t1 = [:add, 5, 9]
t2 = [:name, :add_service, Adder.new, nil]
t3 = { 'type' => 'add', 'value_1' => 5, 'value_2' => 9 }
Элемент кортежа может быть произвольным объектом; это работает, потому что drb умеет выполнять маршалинг и демаршалинг объектов Ruby. (Конечно, необходимо либо включить модуль DRbUndumped, либо сделать определения объектов доступными серверу.)
Пространство объектов создается методом new:
require 'rinda/tuplespace'
ts = Rinda::TupleSpace.new
# ...
Поэтому сервер выглядит так:
require 'rinda/tuplespace'
ts = Rinda::TupleSpace.new
DRb.start_service("druby://somehost:9000", ts)
gets # Нажать Enter для завершения сервера.
А клиент — так:
require 'rinda/tuplespace'
DRb.start_service
ts = DRbObject.new(nil, "druby://somehost:9000")
# ...
К пространству кортежей в Rinda применимы пять операций: read, read_all, write, take и notify.
Операция чтения read позволяет получить один кортеж. Но способ идентификации кортежа не вполне очевиден: необходимо задать кортеж, соответствующий искомому; при этом nil соответствует любому значению.
t1 = ts.read [:Sum,nil] # Может извлечь, например, [:Sum, 14].
Обычно операция read блокирует выполнение программы (для синхронизации). Чтобы быстро проверить существование кортежа, можно выполнить неблокирующее чтение, задав нулевой тайм-аут:
t2 = ts.read [:Result,nil],0 # Возбуждает исключение, если кортеж
# не существует.
Если мы точно знаем или предполагаем, что образцу будет соответствовать не один, а несколько кортежей, можно воспользоваться методом read_all, который возвращает массив:
tuples = ts.read_all [:Foo, nil, nil]
tuples.each do |t|
# ...
end
Метод read_all не принимает второго параметра. Он всегда блокирует программу, если не найдено ни одного подходящего кортежа.
Операция take — это чтение, за которым следует удаление. Иными словами, метод take удаляет кортеж из пространства кортежей и возвращает его вызывающей программе:
t = ts.take [:Sum, nil] # Кортежа больше нет в пространстве кортежей.
Может возникнуть вопрос, почему не существует явного способа удаления. Надо полагать, что этой цели служит метод take.
Метод write помещает кортеж в пространство кортежей. Второй параметр показывает, сколько секунд кортеж может существовать, прежде чем система сочтет, что срок его хранения истек. (По умолчанию его значение равно nil, то есть срок хранения не ограничен.)
ts.write [:Add, 5, 9] # Хранить "вечно".
ts.write [:Foo, "Bar"], 10 # Хранить 10 секунд.
Здесь уместно будет сказать несколько слов о синхронизации. Предположим, что два клиента пытаются одновременно забрать (take) один и тот же кортеж. Одному это удастся, а другой будет заблокирован. Если первый клиент затем изменит кортеж и запишет (write) его обратно в хранилище, то второй получит модифицированную версию. Можно считать, что операция «обновления» — это последовательность take и write, которая не приводит к потере данных. Конечно, как и при любом варианте многопоточного программирования, нужно позаботиться о том, чтобы не возникали тупиковые ситуации.
Метод notify позволяет следить за пространством кортежей и получать уведомления, когда над интересующим вас кортежем была выполнена какая-то операция. Этот метод возвращает объект NotifyTemplateEntry и может наблюдать на операциями четырех видов:
• write;
• take;
• удаление (когда истекает срок хранения кортежа);
• закрытие (когда истекает срок хранения объекта NotifyTemplateEntry).
Поскольку операция чтения ничего не изменяет, то система не поддерживает уведомлений о чтениях. В листинге 20.4 приведен пример использования notify.
Листинг 20.4. Уведомление в системе Rinda
require 'rinda/tuplespace'
ts = Rinda::TupleSpace.new
alberts = ts.notify "write", ["Albert", nil]
martins = ts.notify "take", ["Martin", nil]
thr1 = Thread.new do
alberts.each {|op,t| puts "#{op}: #{t.join(' ')}" }
end
thr2 = Thread.new do
martins.each {|op,t| puts "#{op}: #{t.join(' ')}" }
end
sleep 1
ts.write ["Martin", "Luther"]
ts.write ["Albert", "Einstein"]
ts.write ["Martin", "Fowler"]
ts.write ["Alberf, "Schweitzer"]
ts.write ["Martin", "Scorsese"]
ts.take ["Martin", "Luther"]
# Выводится:
# write: Albert Einstein
# write: Albert Schweitzer
# take: Martin Luther
Мы видели, что read и другие операции пользуются шаблонами для сопоставления с образцами (и этим напоминают регулярные выражения). Мы уже знаем, что nil выступает в роли метасимвола, но можно указать и класс; ему будет соответствовать любой экземпляр этого класса.
tem1 = ["X", Integer] # Соответствует ["X",5], но не ["X","Files"].
tem2 = ["X", NilClass] # Соответствует литералу nil в кортеже.
Кроме того, разрешается определять собственный оператор ветвящегося равенства (===), если вы хотите проводить сопоставление особым способом. В противном случае для сравнения будет использован стандартный оператор ===.
Время жизни кортежа можно задать в момент записи. В сочетании с величинами тайм-аутов для различных операций над кортежами это позволяет ограничить время выполнения простых и более сложных манипуляций.
Тот факт, что у кортежа может быть конечный срок хранения, заодно означает, что по истечении этого срока кортеж можно обновить с помощью специально написанного объекта. В библиотеке имеется готовый класс SimpleRenewer, который каждые 180 секунд обращается к drb-серверу, создавшему кортеж. Если сервер не отвечает, то кортеж удаляется. Но не пытайтесь программировать обновление, пока не освоитесь с парадигмой пространства кортежей.
В листинге 20.5 приведен еще один пример работы с пространством кортежей. Он решает ту же задачу о производителе и потребителе, которая была рассмотрена в главе 13.
Листинг 20.5. Задача о производителе и потребителе
require 'rinda/tuplespace'
ts = Rinda::TupleSpace.new
producer = Thread.new do
item = 0
loop do
sleep rand(0)
puts "Производитель произвел ##{item}"
ts.write ["Item",item]
item += 1
end
end
consumer = Thread.new do
loop do
sleep rand(0)
tuple = ts.take ["Item", nil]
word, item = tuple
puts "Потребитель потребил ##{item}"
end
end
sleep 60 # Работать одну минуту, потом завершиться и завершить потоки.
20.4. Обнаружение сервисов в распределенном Ruby
Методика обнаружения сервисов может оказаться полезной, когда имеется много локально работающих сервисов, поскольку дает возможность находить сервис по имени. Если же число сервисов невелико и их местонахождение точно известно, особого смысла в автоматическом обнаружении нет.
Раз уж вы продолжили чтение, то, наверное, хотите знать, как работает механизм обнаружения сервисов. Такую возможность предоставляет библиотека Rinda::Ring (естественно, основанная на системе Rinda). В чем-то она похожа на службу DNS; это центральная служба регистрации, где хранится информация (в виде пространства кортежей) о drb-процессах. Сервисы drb могут по протоколу UDP найти ближайший сервер регистрации, объявить о своем присутствии или найти другие работающие поблизости сервисы.
Класс Rinda::RingServer реализует сервер регистрации. Он управляет пространством имен, в котором хранится информация о местонахождении других drb-сервисов. RingServer прослушивает сеть в ожидании широковещательных UDP-пакетов с запросами о местонахождении сервера. В ответ на такой запрос он устанавливает соединение (посредством drb) с отправившим его сервисом. Пример:
require 'rinda/ring'
require 'rinda/tuplespace'
DRb.start_service
Rinda::RingServer.new(Rinda::TupleSpace.new)
DRb.thread.join
Класс Rinda::RingProvider регистрирует сервис, объявляя о его присутствии серверу RingServer. При этом сообщается о типе сервиса и о фронтальном объекте, предоставляющем этот сервис, а также передается описательная информация. В следующем примере мы создаем простой сервис Adder, который складывает два числа, а потом объявляем о нем всему миру:
require 'rinda/ring'
class Adder
include DRbUndumped
def add(val1, val2)
return val1 + val2
end
end
adder = Adder.new
DRb.start_service(nil, adder)
Rinda::RingProvider.new(:adder, adder, 'Simple Adder')
DRb.thread.join
Класс Rinda::RingFinger (названный так по аналогии с имеющейся в UNIX командой finger) применяется для обнаружения сервера RingServer. Он посылает широковещательный UDP-пакет и ждет ответа от сервера. Затем RingFinger можно использовать для поиска объявленных сервисов в пространстве кортежей.
require 'rinda/ring'
DRb.start_service
rs = Rinda::RingFinger.primary
list = [rs] + Rinda::Ringfinger.to_a
svc = list.find_all [:name, :adder, nil, nil]
20.5. Заключение
Эта глава содержит введение в распределенный Ruby. Мы познакомились с тем, как сервис запускается и взаимодействует с клиентами, а также рассмотрели вопросы безопасности.
Мы выяснили, что система Rinda может выступать в роли простого распределенного хранилища объектов, обеспечивающего синхронизацию доступа. Наконец, было показано, как можно использовать библиотеку Rinda::Ring для обнаружения drb-сервисов.
На этом рассмотрение распределенного Ruby заканчивается. Переходим к следующей теме — инструментам разработки на языке Ruby, в частности программе Rake, оболочке irb, интегрированным средам разработки (IDE) и др.