Программирование на языке Ruby

Фултон Хэл

Глава 20. Распределенный Ruby

 

 

В настоящее время имеется немало технологий, поддерживающих распределенную обработку: различные варианты RPC, а также COM, CORBA, DCE и Java RMI.

Одни проще, другие сложнее, но в принципе все делают одно и то же - предоставляют относительно прозрачный способ связи между находящимися в сети объектами так, чтобы с удаленными объектами можно было работать, как с локальными.

Зачем это вообще может понадобиться? Причин много. Например, чтобы распределить некоторую вычислительную задачу между многими процессорами. Примером может послужить программа SETI@home, которая использует ваш ПК для обработки небольших объемов данных в поисках внеземного разума (кстати, эта программа не является проектом института SETI). Другой пример — привлечение широких масс к взлому шифра RSA129 (эта попытка увенчалась успехом несколько лет назад). Существует очень много задач, которые можно разбить на небольшие части, пригодные для распределенного решения.

Можно также представить себе, что вы хотите предоставить интерфейс к некоему сервису, не раскрывая исходных текстов. Часто это делается с помощью Web-приложений, но из-за отсутствия состояния в протоколе HTTP это не всегда удобно (есть и другие недостатки). Механизм распределенного программирования позволяет решать подобные задачи более естественно.

В мире Ruby ответом на этот вызов стала программа drb, написанная Масатоси Секи (Masatoshi Seki); еще ее название записывают так: DRb. Существуют и другие способы распределенной обработки на Ruby, но drb, пожалуй, самый легкий. Здесь нет сложных служб разрешения имен, как в CORBA. Это всего лишь простая и удобная библиотека, предоставляющая всю необходимую функциональность. В данной главе мы рассмотрим основы работы как с ней самой, так и с надстроенной над ней системой Rinda.

 

20.1. Обзор: библиотека drb

Библиотека drb состоит из двух основных частей: серверной и клиентской. Грубую границу между ними можно провести следующим образом:

Сервер:

• запускает TCPServer и начинает прослушивать порт;

• привязывает объект к экземпляру сервера drb;

• принимает запросы на соединение от клиентов и отвечает на их сообщения;

• дополнительно может предоставлять контроль доступа (безопасность).

Клиент:

• устанавливает соединение с сервером;

• привязывает локальный объект к удаленному экземпляру сервера;

• посылает сообщения серверу и получает ответы.

Метод класса start_service отвечает за запуск TCP-сервера, прослушивающего указанный порт. Он принимает два параметра: URI (универсальный идентификатор ресурса), задающий порт (если он равен nil, то порт выбирается динамически), и объект, к которому мы хотим привязаться. Этот объект будет доступен удаленному клиенту, который сможет вызывать его методы, как если бы объект был локальным.

require "drb"

myobj = MyServer.new

DRb.start_service("druby://:1234", myobj) # Порт 1234.

# ...

Если порт выбирается динамически, то для получения полного URI, включающего и номер порта, можно воспользоваться методом класса uri.

DRb.start_service(nil, myobj)

myURI = DRb.uri # "druby://hal9000:2001"

Поскольку drb —многопоточная программа, любое серверное приложение должно выполнять join в потоке сервера (чтобы не дать приложению завершиться преждевременно и тем самым уничтожить выполняющийся поток).

# Предотвратить преждевременный выход.

DRb.thread.join

На стороне клиента мы вызываем метод start_service без параметров и с помощью класса DRbObject создаем локальный объект, соответствующий удаленному. Обычно первым параметром методу DRbObject.new передается nil.

require "drb"

DRb.start_service

obj = DRbObject.new(nil, "druby://hal9000:2001")

# Сообщения, передаваемые obj, перенаправляются

# удаленному объекту на стороне сервера...

Следует подчеркнуть, что на стороне сервера привязка осуществляется к единственному объекту, который должен отвечать на все получаемые запросы. Если клиентов несколько, то объект должен быть безопасным относительно потоков, чтобы не оказаться в некорректном состоянии. (Для совсем простых или узкоспециализированных приложений это может быть и необязательно.)

Мы не можем вдаваться в технические детали. Но имейте в виду, что если клиент читает или изменяет внутреннее состояние удаленного объекта, то при наличии нескольких клиентов возможна интерференция. Во избежание таких неприятностей мы рекомендуем применять механизмы синхронизации, например класс Mutex. (Подробнее о потоках и синхронизации рассказывается в главе 13.)

Скажем хотя бы несколько слов о безопасности. Ведь не всегда желательно, чтобы с вашим сервером мог соединяться кто угодно. Помешать им пытаться вы не можете, зато можете сделать такие попытки безуспешными.

В программе drb есть понятие списка контроля доступа (ACL). Это не что иное, как списки клиентов (или категорий клиентов), которым явно разрешен (или запрещен) доступ.

Приведем пример. Для создания нового списка ACL мы воспользуемся классом ACL, которому передадим один или два параметра.

Второй (необязательный) параметр метода ACL.new служит для ответа на вопрос: «Мы запрещаем доступ всем клиентам, кроме некоторых, или, наоборот, разрешаем доступ всем клиентам, кроме некоторых?» По умолчанию принимается первый вариант, который обозначается константой DENY_ALLOW равной 0. Второй режим обозначается ALLOW_DENY равной 1.

Первый параметр ACL.new представляет собой обычный массив строк, которые идут парами. Первая строка в паре должна быть равна "deny" или "allow", вторая описывает одного клиента или группу клиентов (по имени или по адресу):

require "drb/acl"

acl = ACL.new( %w[ deny all

 allow 192.168.0.*

 allow 210.251.121.214

 allow localhost] )

Первая пара в этом примере, строго говоря, излишня, но проясняет смысл всей конструкции.

А как используются ACL? Метод install_acl приводит ACL в действие. Его необходимо вызывать перед обращением к start_service, иначе он не возымеет эффекта.

# Продолжение примера...

DRb.install_acl(acl)

DRb.start_service(nil, some_object)

# ...

Теперь, после запуска сервиса любой неавторизованный запрос на соединение приведет к исключению RuntimeError.

Это, конечно, не все, что можно сказать о библиотеке drb. Но для обзора вполне достаточно. В следующем разделе мы рассмотрим простой drb-сервер и drb-клиент, близкие к реальным программам. А затем поговорим о программах Rinda и Ring.

 

20.2. Пример: эмуляция биржевой ленты

В этом примере сервер публикует в сети биржевые котировки акций. К серверу может обратиться любой клиент, желающий узнать, сколько сейчас стоит его пакет.

Но мы добавили одну тонкость. Не желая следить за малейшими колебаниями цен, мы реализовали модуль Observer, который позволяет подписаться на информационный канал. Клиент следит за поступающими сведениями и предупреждает нас, когда изменение цены превысит заданный порог.

Сначала рассмотрим модуль DrbObservable. Это прямолинейная реализация паттерна Observer (Наблюдатель), описанного в замечательной книге Э. Гаммы, Р. Хелма, Р. Джонсона и Дж. Влиссидеса «Паттерны проектирования» (см. сноску в разделе 12.3.1). Еще этот паттерн называют «Издатель-Подписчик».

В листинге 20.1 наблюдатель определен как объект, отвечающий на вызов метода update. Сервер добавляет наблюдателей по их просьбе и посылает им уведомления, обращаясь к методу notify_observers.

Листинг 20.1. Модуль DrbObservable

module DRbObservable

 def add_observer(observer)

  @observer_peers ||= []

  unless observer.respond_to? :update

   raise NameError, "наблюдатель должен отвечать на вызов 'update'"

  end

  @observer_peers.push observer

 end

 def delete_observer(observer)

  @observer_peers.delete observer if defined? @observer_peers

 end

 def notify_observers(*arg)

  return unless defined? @observer_peers

  for i in @observer_peers.dup

   begin

    i.update(*arg)

   rescue

    delete_observer(i)

   end

  end

 end

end

Сервер (он же канал) в листинге 20.2 эмулирует биржевые котировки с помощью последовательности псевдослучайных чисел (простите мою иронию, но это очень точно соответствует характеру рынка). Символ, идентифицирующий компанию, — всего лишь косметическое украшение, никакого реального смысла в этой программе он не имеет. При каждом изменении цены посылается уведомление всем наблюдателям.

Листинг 20.2. Канал биржевых котировок (drb-сервер)

require "drb"

require "drb_pbserver"

# Генерировать случайные котировки.

class MockPrice

 MIN = 75

 RANGE = 50

 def initialize(symbol)

  @price = RANGE / 2

 end

 def price

  @price += (rand() - 0.5)*RANGE

  if @price < 0

   @price = -@price

  elsif @price >= RANGE

   @price = 2*RANGE - @price

  end

  MIN + @price

 end

end

class Ticker # Периодически получать котировку акций.

 include DRbObservable

 def initialize(price_feed)

  @feed = price_feed

  Thread.new { run }

 end

 def run

  lastPrice = nil

  loop do

   price = @feed.price

   print "Текущая котировка: #{price}\n"

   if price != lastPrice

    lastPrice = price

    notify_observers(Time.now, price)

   end

   sleep 1

  end

 end

end

ticker = Ticker.new(MockPrice.new("MSFT"))

DRb.start_service('druby://localhost:9001', ticker)

puts 'Нажмите [return] для завершения.'

gets

На платформе Windows примененный способ завершения программы вызывает сложности. Функция gets в этом случае может блокировать главный поток. Если вы это видите, попробуйте вместо обращения к gets поставить DRb.thread.join (а завершайте программу нажатием Ctrl+C).

Неудивительно, что клиент (листинг 20.3) начинает с установления соединения с сервером. Он получает ссылку на объект показа котировок и устанавливает верхний и нижний пороги изменения цены. Затем клиент выводит сообщение пользователю всякий раз, как цена выходит за пределы указанного диапазона.

Листинг 20.3. Наблюдатель биржевых котировок (drb-клиент)

require "drb"

class Warner

 include DRbUndumped

 def initialize(ticker, limit)

  @limit = limit

  ticker.add_observer(self) # Любой объект Warner

                            # является наблюдателем.

 end

end

class WarnLow < Warner

 def update(time, price)    # Обратный вызов наблюдателя.

  if price < @limit

   print "--- #{time.to_s}: Цена ниже #@limit: #{price}\n"

  end

 end

end

class WarnHigh < Warner

 def update(time, price)    # Обратный вызов наблюдателя.

  if price > @limit

   print "+++ #{time.to_s}: Цена выше #@limit: #{price}\n"

  end

 end

end

DRb.start_service

ticker = DRbObject.new(nil, "druby://localhost:9001")

WarnLow.new(ticker, 90)

WarnHigh.new(ticker, 110)

puts 'Нажмите [return] для завершения.'

gets

Модуль DRbUndumped (см. листинге 20.3) следует включать в любой объект, который не нужно подвергать маршалингу. Самого присутствия этого модуля в числе предков объекта достаточно, чтобы drb не пытался применять к нему маршалинг. Вот исходный текст этого модуля целиком:

module DrbUndumped

def _dump(dummy)

raise TypeError, "can't dump"

end

end

Приложение из этого раздела достаточно содержательно, и в то же время в нем легко разобраться. Есть и другие подходы к решению подобных задач. Но способ, показанный нами, демонстрирует простоту и элегантность распределенного Ruby.

 

20.3. Rinda: пространство кортежей в Ruby

Термин «пространство кортежей» появился в 1985 году, а сама идея еще старше. Кортежем называется массив или вектор, состоящий из элементов данных (как строка в таблице базы данных). Пространство кортежей — это большое объектное пространство, наполненное кортежами, нечто вроде «информационного супа».

Пока реализация пространства кортежей кажется ничем не примечательной. Но все становится гораздо интереснее, стоит лишь осознать, что к нему могут обращаться многие клиенты и доступ должен синхронизироваться. Короче говоря, это распределенная сущность; любой клиент может читать из пространства кортежей или писать в него, то есть его можно рассматривать как большое распределенное хранилище или даже способ коммуникации.

Первой реализацией пространства кортежей был проект Linda — исследование в области параллельного программирования, выполненное в Йельском университете в 1980-х годах. Реализация на языке Ruby (конечно, на основе библиотеки drb), естественно, называется Rinda.

Кортеж в Rinda может быть массивом или хэшем. На хэш налагается дополнительное ограничение: все ключи должны быть строками. Вот несколько примеров простых кортежей:

t1 = [:add, 5, 9]

t2 = [:name, :add_service, Adder.new, nil]

t3 = { 'type' => 'add', 'value_1' => 5, 'value_2' => 9 }

Элемент кортежа может быть произвольным объектом; это работает, потому что drb умеет выполнять маршалинг и демаршалинг объектов Ruby. (Конечно, необходимо либо включить модуль DRbUndumped, либо сделать определения объектов доступными серверу.)

Пространство объектов создается методом new:

require 'rinda/tuplespace'

ts = Rinda::TupleSpace.new

# ...

Поэтому сервер выглядит так:

require 'rinda/tuplespace'

ts = Rinda::TupleSpace.new

DRb.start_service("druby://somehost:9000", ts)

gets # Нажать Enter для завершения сервера.

А клиент — так:

require 'rinda/tuplespace'

DRb.start_service

ts = DRbObject.new(nil, "druby://somehost:9000")

# ...

К пространству кортежей в Rinda применимы пять операций: read, read_all, write, take и notify.

Операция чтения read позволяет получить один кортеж. Но способ идентификации кортежа не вполне очевиден: необходимо задать кортеж, соответствующий искомому; при этом nil соответствует любому значению.

t1 = ts.read [:Sum,nil] # Может извлечь, например, [:Sum, 14].

Обычно операция read блокирует выполнение программы (для синхронизации). Чтобы быстро проверить существование кортежа, можно выполнить неблокирующее чтение, задав нулевой тайм-аут:

t2 = ts.read [:Result,nil],0 # Возбуждает исключение, если кортеж

# не существует.

Если мы точно знаем или предполагаем, что образцу будет соответствовать не один, а несколько кортежей, можно воспользоваться методом read_all, который возвращает массив:

tuples = ts.read_all [:Foo, nil, nil]

tuples.each do |t|

# ...

end

Метод read_all не принимает второго параметра. Он всегда блокирует программу, если не найдено ни одного подходящего кортежа.

Операция take — это чтение, за которым следует удаление. Иными словами, метод take удаляет кортеж из пространства кортежей и возвращает его вызывающей программе:

t = ts.take [:Sum, nil] # Кортежа больше нет в пространстве кортежей.

Может возникнуть вопрос, почему не существует явного способа удаления. Надо полагать, что этой цели служит метод take.

Метод write помещает кортеж в пространство кортежей. Второй параметр показывает, сколько секунд кортеж может существовать, прежде чем система сочтет, что срок его хранения истек. (По умолчанию его значение равно nil, то есть срок хранения не ограничен.)

ts.write [:Add, 5, 9]      # Хранить "вечно".

ts.write [:Foo, "Bar"], 10 # Хранить 10 секунд.

Здесь уместно будет сказать несколько слов о синхронизации. Предположим, что два клиента пытаются одновременно забрать (take) один и тот же кортеж. Одному это удастся, а другой будет заблокирован. Если первый клиент затем изменит кортеж и запишет (write) его обратно в хранилище, то второй получит модифицированную версию. Можно считать, что операция «обновления» — это последовательность take и write, которая не приводит к потере данных. Конечно, как и при любом варианте многопоточного программирования, нужно позаботиться о том, чтобы не возникали тупиковые ситуации.

Метод notify позволяет следить за пространством кортежей и получать уведомления, когда над интересующим вас кортежем была выполнена какая-то операция. Этот метод возвращает объект NotifyTemplateEntry и может наблюдать на операциями четырех видов:

• write;

• take;

• удаление (когда истекает срок хранения кортежа);

• закрытие (когда истекает срок хранения объекта NotifyTemplateEntry).

Поскольку операция чтения ничего не изменяет, то система не поддерживает уведомлений о чтениях. В листинге 20.4 приведен пример использования notify.

Листинг 20.4. Уведомление в системе Rinda

require 'rinda/tuplespace'

ts = Rinda::TupleSpace.new

alberts = ts.notify "write", ["Albert", nil]

martins = ts.notify "take", ["Martin", nil]

thr1 = Thread.new do

 alberts.each {|op,t| puts "#{op}: #{t.join(' ')}" }

end

thr2 = Thread.new do

 martins.each {|op,t| puts "#{op}: #{t.join(' ')}" }

end

sleep 1

ts.write ["Martin", "Luther"]

ts.write ["Albert", "Einstein"]

ts.write ["Martin", "Fowler"]

ts.write ["Alberf, "Schweitzer"]

ts.write ["Martin", "Scorsese"]

ts.take ["Martin", "Luther"]

# Выводится:

# write: Albert Einstein

# write: Albert Schweitzer

# take: Martin Luther

Мы видели, что read и другие операции пользуются шаблонами для сопоставления с образцами (и этим напоминают регулярные выражения). Мы уже знаем, что nil выступает в роли метасимвола, но можно указать и класс; ему будет соответствовать любой экземпляр этого класса.

tem1 = ["X", Integer]  # Соответствует ["X",5], но не ["X","Files"].

tem2 = ["X", NilClass] # Соответствует литералу nil в кортеже.

Кроме того, разрешается определять собственный оператор ветвящегося равенства (===), если вы хотите проводить сопоставление особым способом. В противном случае для сравнения будет использован стандартный оператор ===.

Время жизни кортежа можно задать в момент записи. В сочетании с величинами тайм-аутов для различных операций над кортежами это позволяет ограничить время выполнения простых и более сложных манипуляций.

Тот факт, что у кортежа может быть конечный срок хранения, заодно означает, что по истечении этого срока кортеж можно обновить с помощью специально написанного объекта. В библиотеке имеется готовый класс SimpleRenewer, который каждые 180 секунд обращается к drb-серверу, создавшему кортеж. Если сервер не отвечает, то кортеж удаляется. Но не пытайтесь программировать обновление, пока не освоитесь с парадигмой пространства кортежей.

В листинге 20.5 приведен еще один пример работы с пространством кортежей. Он решает ту же задачу о производителе и потребителе, которая была рассмотрена в главе 13.

Листинг 20.5. Задача о производителе и потребителе

require 'rinda/tuplespace'

ts = Rinda::TupleSpace.new

producer = Thread.new do

 item = 0

 loop do

  sleep rand(0)

  puts "Производитель произвел ##{item}"

  ts.write ["Item",item]

  item += 1

 end

end

consumer = Thread.new do

 loop do

  sleep rand(0)

  tuple = ts.take ["Item", nil]

  word, item = tuple

  puts "Потребитель потребил ##{item}"

 end

end

sleep 60 # Работать одну минуту, потом завершиться и завершить потоки.

 

20.4. Обнаружение сервисов в распределенном Ruby

Методика обнаружения сервисов может оказаться полезной, когда имеется много локально работающих сервисов, поскольку дает возможность находить сервис по имени. Если же число сервисов невелико и их местонахождение точно известно, особого смысла в автоматическом обнаружении нет.

Раз уж вы продолжили чтение, то, наверное, хотите знать, как работает механизм обнаружения сервисов. Такую возможность предоставляет библиотека Rinda::Ring (естественно, основанная на системе Rinda). В чем-то она похожа на службу DNS; это центральная служба регистрации, где хранится информация (в виде пространства кортежей) о drb-процессах. Сервисы drb могут по протоколу UDP найти ближайший сервер регистрации, объявить о своем присутствии или найти другие работающие поблизости сервисы.

Класс Rinda::RingServer реализует сервер регистрации. Он управляет пространством имен, в котором хранится информация о местонахождении других drb-сервисов. RingServer прослушивает сеть в ожидании широковещательных UDP-пакетов с запросами о местонахождении сервера. В ответ на такой запрос он устанавливает соединение (посредством drb) с отправившим его сервисом. Пример:

require 'rinda/ring'

require 'rinda/tuplespace'

DRb.start_service

Rinda::RingServer.new(Rinda::TupleSpace.new)

DRb.thread.join

Класс Rinda::RingProvider регистрирует сервис, объявляя о его присутствии серверу RingServer. При этом сообщается о типе сервиса и о фронтальном объекте, предоставляющем этот сервис, а также передается описательная информация. В следующем примере мы создаем простой сервис Adder, который складывает два числа, а потом объявляем о нем всему миру:

require 'rinda/ring'

class Adder

 include DRbUndumped

 def add(val1, val2)

  return val1 + val2

 end

end

adder = Adder.new

DRb.start_service(nil, adder)

Rinda::RingProvider.new(:adder, adder, 'Simple Adder')

DRb.thread.join

Класс Rinda::RingFinger (названный так по аналогии с имеющейся в UNIX командой finger) применяется для обнаружения сервера RingServer. Он посылает широковещательный UDP-пакет и ждет ответа от сервера. Затем RingFinger можно использовать для поиска объявленных сервисов в пространстве кортежей.

require 'rinda/ring'

DRb.start_service

rs = Rinda::RingFinger.primary

list = [rs] + Rinda::Ringfinger.to_a

svc = list.find_all [:name, :adder, nil, nil]

 

20.5. Заключение

Эта глава содержит введение в распределенный Ruby. Мы познакомились с тем, как сервис запускается и взаимодействует с клиентами, а также рассмотрели вопросы безопасности.

Мы выяснили, что система Rinda может выступать в роли простого распределенного хранилища объектов, обеспечивающего синхронизацию доступа. Наконец, было показано, как можно использовать библиотеку Rinda::Ring для обнаружения drb-сервисов.

На этом рассмотрение распределенного Ruby заканчивается. Переходим к следующей теме — инструментам разработки на языке Ruby, в частности программе Rake, оболочке irb, интегрированным средам разработки (IDE) и др.