Dead letter queue что это

Очереди — что это, зачем и как использовать? Посмотрим на возможности AWS SQS

Dead letter queue что это. Смотреть фото Dead letter queue что это. Смотреть картинку Dead letter queue что это. Картинка про Dead letter queue что это. Фото Dead letter queue что это

Сначала давайте дадим определение понятию «очередь — queue».

Возьмем для рассмотрения тип очереди «FIFO»(first in, first out). Если взять значение из википедии — «это абстрактный тип данных с дисциплиной доступа к элементам». Если вкратце, это означает что мы не можем из нее доставать данные в случайном порядке, а только забирать то — что пришло первым.

Далее, нужно определиться зачем вообще они нужны?

1. Для отложенных операций. Классическим примером является обработка картинок. К примеру пользователь загрузил на сайт картинку, которую нам нужно обработать, эта операция занимает много времени, пользователь столько ждать не хочет. Поэтому мы грузим картинку, далее передаем ее в очередь. И она будет обработана, когда какой либо «worker» ее достанет.

2. Для обработки пиковых нагрузок. К примеру, есть какая-то часть системы, на которую иногда обрушивается большой трафик и она не требует мгновенного ответа. Как вариант, генерация каких-либо отчетов. Выкидывая в очередь эту задачу — мы даем возможность обрабатывать это с равномерной нагрузкой на систему.

3. Масштабируемость. И наверное самая важная причина, очередь дает возможность
масштабироваться. Это означает, что вы можете поднять несколько сервисов для обработки параллельно, что сильно повысит производительность.

Теперь давайте рассмотрим проблемы, с которыми столкнемся, если будем создавать очередь сами:

1. Параллельный доступ. Забрать из очереди определенное сообщение может только один обработчик. То есть если одновременно два сервиса попросят сообщения, каждому из них должен вернуться уникальный набор сообщений. Иначе, получится, что одно сообщение обработается два раза. Что может быть чревато.

2. Механизм дедупликации. В сервисе должна быть система, защищающая очередь от дубликатов. Может быть ситуация, в которой случайно в очередь будет отправлено один и тот-же набор данных два раза. В итоге мы одно и тоже обработаем два раза. Что опять же чревато.

3. Механизм обработки ошибок. Допустим наш сервис забрал из очереди три сообщения. Два из которых он успешно обработал, отправив запросы на удаление из очереди. А третье он не смог обработать и умер. Сообщение которое находится в статусе обработки — недоступно для других сервисов. И оно не должно навечно остаться в статусе обработки. Такое сообщение должно передаться другому обработчику по какой-то логике. Вариант реализации такой логики мы рассмотрим скоро на примере AWS SQS(Simple Queue Service)

Amazon Web Services — Simple Queue Service

Теперь давайте рассмотрим как решает эти проблемы SQS и что он может

1. Параллельный доступ. У очереди вы можете задать параметр «Visibility timeout». Он определяет, сколько по времени максимально может длиться обработка сообщения. По умолчанию он равен 30 секундам. Когда какой-либо сервис забирает сообщение, оно переводится в статус «In Flight» на 30 секунд. Если за это время не было команды удалить это сообщение из очереди, оно возвращается в начало и следующий сервис сможет его получить для обработки еще раз.

Небольшая схемка работы.

Dead letter queue что это. Смотреть фото Dead letter queue что это. Смотреть картинку Dead letter queue что это. Картинка про Dead letter queue что это. Фото Dead letter queue что это

2. Механизм обработки ошибок. В SQS можно настроить вторую очередь для «мертвых» сообщений(Dead Letter Queue). То есть те, которые не смог обработать наш сервис, будут отправляться в отдельную очередь, которой вы можете распоряжаться по своему усмотрению. Также вы можете задавать после которого кол-ва неудачных попыток сообщение перейдет в «мертвую» очередь. Неудачной попыткой считается истечение «Visibility timeout». То есть если за это время не было отправлено запроса на удаление, такое сообщение будет считаться необработанным и вернется в основную очередь или перейдет в «мертвую».

3. Дедупликация сообщений. Так же SQS имеет систему защиты от дубликатов. У каждого сообщения есть «Deduplication Id», SQS не добавит в очередь сообщение с
повторным «Deduplication Id» в течении 5 минут. Вы обязаны задавать «Deduplication Id» в каждом сообщении или включить генерацию id на основе контента. Это означает что в «Deduplication Id» будет попадать хэш сгенерированный исходя из вашего контента. Параметр «Content-Based Deduplication». Подробнее о дедупликации

Notice: Будьте внимательны, если отправить два одинаковых сообщения в течении 5 минут и у вас включен «Content-Based Deduplication» SQS не добавит второе сообщение в очередь.

Notice: Будьте внимательны, к примеру если на девайсе отпала связь, и он не получил ответ и затем отправил повторный запрос спустя 5 минут, дубликат создастся.

4. Long poll. Длительный опрос. SQS поддерживает такой тип подключения с максимальным таймаутом в 20 секунд. Что нам позволяет сэкономить на трафике и «дерганье» сервиса.

5. Метрики. Так же Amazon предоставляет подробные метрики по очередям. Такие как кол-во полученных/отправленных/удаленных сообщений, размеры в КБ этих сообщений и прочее. Также вы можете подключить SQS к сервису логов CloudWatch. Там сможете увидеть еще подробнее. Так же там можно настроить так называемые «сигналы тревоги»(Alarms) и вы можете настраивать действия по каким либо событиям. Подробнее о подключении к SQS. И Документация по CloudWatch

Теперь давайте рассмотрим настройки очереди:

Основные:

Message Retention Period — кол-во секунд/минут/часов/дней, которое означает, сколько по времени будут в очереди храниться необработанные сообщения. Максимально — 14 дней.

Maximum Message Size — максимальный размер сообщения в KB. Значение от 1KB до 256KB.

Receive Message Wait Time — время, сколько будет держаться коннект в случае, если мы используем «Long poll», для получения новых сообщений.

Content-Based Deduplication — флаг, если установлен в true, то в каждое сообщение будет добавлен «Deduplication Id» в виде SHA-256 хэша, сгенерированный из контента.

Настройки «мертвой очереди»

Use Redrive Policy — флаг, если установлен, то сообщения после нескольких попыток будут перенаправляться.

Dead Letter Queue — имя «мертвой» очереди, в которую будут отправляться необработанные сообщения.

Maximum Receives — кол-во неудачных попыток обработки, после которых сообщение будет отправляться в «мертвую» очередь

Notice: Также обратите внимание, что все основные параметры мы можем отправлять вместе с каждым сообщением отдельно. К примеру, каждое отдельное сообщение может иметь свой Visibility Timeout или Delivery Delay

Теперь немного про сами сообщения и их свойства:

Сообщение имеет несколько параметров:

Также есть атрибуты сообщений

Атрибуты состоят из имени, типа и значения.

Notice: Обратите внимание, что максимальное кол-во атрибутов 10 Подробности

Источник

Очереди недоставленных сообщений

Процедура настройки и инструкции по построению для данного образца приведены в конце этого раздела.

в этом примере демонстрируется использование очереди недоставленных сообщений приложения, которая доступна только в Windows Vista. этот пример можно изменить, чтобы использовать системные очереди по умолчанию для MSMQ 3,0 на Windows Server 2003 и Windows XP.

При использовании очередей клиент взаимодействует со службой посредством очереди. Конкретно, клиент отправляет сообщения в очередь. Служба получает сообщения из очереди. Поэтому клиенту и службе не обязательно выполняться одновременно, чтобы взаимодействовать посредством очереди.

Связь с использованием очереди может приводить к определенным задержкам, поэтому следует назначать для сообщений срок жизни, чтобы сообщения не доставлялись приложению позже положенного времени. В некоторых случаях приложению нужно знать, что приложение не доставлено. В таких ситуациях, если истек срок жизни сообщения или его не удалось доставить, сообщение помещается в очередь недоставленных сообщений. Отправляющее приложение может после этого выполнить чтение сообщений из очереди недоставленных сообщений и предпринять корректирующие действия (к ним относятся ситуации от бездействия приложения до исправления причины ошибки доставки и повторной отправки сообщения).

Очередь недоставленных сообщений в привязке NetMsmqBinding выражается в следующих свойствах.

Свойство DeadLetterQueue выражает тип очереди недоставленных сообщений, который необходим клиенту. Значения перечисления указаны ниже:

None : клиенту не требуется очередь недоставленных сообщений;

System : для хранения сообщений, которые не удалось доставить, служит системная очередь недоставленных сообщений. Системная очередь недоставленных сообщений общая для всех приложений, работающих на компьютере.

Custom : для хранения сообщений, которые не удалось доставить, служит пользовательская очередь недоставленных сообщений, определяемая с помощью свойства CustomDeadLetterQueue. эта функция доступна только в Windows Vista. Используется, если приложению нужна собственная очередь недоставленных сообщений, независимая от других приложений, работающих на данном компьютере.

Свойство CustomDeadLetterQueue выражает конкретную очередь, которая используется как очередь недоставленных сообщений. эта проблема доступна только в Windows Vista.

В этом образце клиент отправляет службе пакет сообщений из области транзакции и задает неоправданно низкое значение срока жизни этих сообщений (около 2 секунд). Клиент также задает пользовательскую очередь недоставленных сообщений, в которую помещаются сообщения с истекшим сроком жизни.

Клиентское приложение может после этого выполнить чтение сообщений из очереди недоставленных сообщений и попытаться повторно отправить сообщение или исправить ошибку, из-за которой сообщение оказалось в очереди недоставленных сообщений, и отправить его. В этом образце клиент отображает сообщение об ошибке.

Связь с службой осуществляется в области транзакции. Служба выполняет чтение сообщений из очереди, выполняет операцию и отображает результаты операции. Также приложение создает очередь недоставленных сообщений.

В конфигурации клиента задается короткий срок, в течение которого сообщение должно достичь службы. Если за указанное время сообщение передать не удается, его срок жизни истекает и оно помещается в очередь недоставленных сообщений.

Приложение должно определить очередь, которая используется в качестве очереди недоставленных сообщений. Если очередь не задана, для недоставленных сообщений используется системная очередь недоставленных сообщений по умолчанию. В этом примере клиентское приложение задает собственную очередь недоставленных сообщений.

Реализация службы недоставленных сообщений ищет причину, по которой сообщение не было доставлено, и принимает меры по исправлению ошибки. Причина ошибки доставки сообщения содержится в двух перечислениях: DeliveryFailure и DeliveryStatus. Объект MsmqMessageProperty можно извлечь из объекта OperationContext, как показано в следующем образце кода.

В этом примере служба недоставленных сообщений повторно отправляет сообщение, если причина сбоя заключается в том, что время ожидания сообщения истекло. По всем остальным причинам отображается ошибка доставки, как показано в следующем образце кода:

В следующем образце показана конфигурация для недоставленного сообщения:

В этом образце нужно запустить три исполняемых файла, чтобы посмотреть, как работает очередь недоставленных сообщений для всех приложений: клиента, службы и службы недоставленных сообщений, которая выполняет чтение очереди недоставленных сообщений для каждого приложения и повторно отправляет сообщение службе. Это консольные приложения, отображающие вывод в окнах консоли.

Поскольку используется очередь, клиенту и службе не обязательно быть запущенными и работать одновременно. Можно запустить клиент, выключить его, а затем запустить службу, которая все равно получит сообщения клиента. Для создания очереди следует запустить и завершить службу.

При запуске клиент отображает сообщение:

Клиент попытался отправить сообщение, но короткое время жизни сообщения истекло, и оно попало в очередь недоставленных сообщений для каждого приложения.

После этого запускается служба недоставленных сообщений, которая выполняет чтение сообщения, отображает код ошибки и повторно отправляет сообщение службе.

Служба запускается, выполняет чтение повторно отправленного сообщения и обрабатывает его.

Настройка, сборка и выполнение образца

При первом запуске служба проверит наличие очереди. Если очередь отсутствует, служба ее создаст. Можно сначала запустить службу, чтобы создать очередь, либо создать ее с помощью диспетчера очередей MSMQ. Чтобы создать очередь в Windows 2008, выполните следующие шаги.

откройте диспетчер сервера в Visual Studio 2012.

Щелкните правой кнопкой мыши частные очереди сообщений и выберите создать, Частная очередь.

Введите в ServiceModelSamplesTransacted качестве имени новой очереди.

чтобы выполнить пример в конфигурации с одним или несколькими компьютерами соответствующим образом, замените localhost на полное имя компьютера и следуйте инструкциям в разделе запуск примеров Windows Communication Foundation.

Выполнение образца на компьютере, входящем в рабочую группу

Обеспечьте связь конечной точки с привязкой, задав атрибут bindingConfiguration конечной точки.

Перед выполнением образца убедитесь, что изменена конфигурация службы DeadLetterService, сервера и клиента.

Комментарии

Источник

Обзор очередей недоставленных сообщений служебной шины

Очереди служебной шины Azure и подписки на разделы предоставляют дополнительную очередь, которая называется очередью недоставленных сообщений. Очередь недоставленных сообщений не нужно создавать явным образом. Ее нельзя удалить, ею нельзя управлять отдельно от основной сущности.

В этой статье рассматриваются очереди недоставленных сообщений в служебной шине. Большую часть статьи иллюстрирует пример для очередей недоставленных сообщений, доступный на сайте GitHub.

Очередь недоставленных сообщений

Очередь недоставленных сообщений предназначена для хранения сообщений, которые невозможно доставить ни одному адресату либо не удалось обработать. Сообщение из очереди DLQ можно извлечь и изучить. Приложение с помощью оператора может исправить проблемы и повторно отправить это сообщение, зарегистрировать ошибку и предпринять действие по исправлению.

С точки зрения интерфейса API и протокола очередь DLQ очень похожа на другие очереди за исключением того, что сообщения могут отправляться только посредством операции недоставленного сообщения родительской сущности. Кроме того, в ней не отслеживается время жизни и из нее нельзя удалить сообщение. Очередь недоставленных сообщений полностью поддерживает доставку с блокировкой для просмотра и транзакционные операции.

Автоматическая очистка очереди DLQ не выполняется. Сообщения остаются в очереди DLQ, пока вы явно не извлечете их оттуда и не завершите недоставленное сообщение.

Количество сообщений в очереди недоставленных сообщений

Нет возможности получить количество сообщений в очереди недоставленных сообщений на уровне разделов. Это обусловлено тем, что эти сообщения не сохраняются на уровне разделов, если в служебной шине не происходило внутренних ошибок. Вместо этого, когда отправитель отправляет сообщение в раздел, оно за несколько миллисекунд передается в подписку этого раздела и уже не находится на уровне раздела. Это означает, что сообщения нужно просматривать в очереди недоставленных сообщений, сопоставленной с подпиской нужного раздела. В следующем примере Service Bus Explorer показывает, что в очереди недоставленных сообщений для подписки test1 есть 62 сообщения.

Dead letter queue что это. Смотреть фото Dead letter queue что это. Смотреть картинку Dead letter queue что это. Картинка про Dead letter queue что это. Фото Dead letter queue что это

Перемещение сообщений в очередь DLQ

В служебной шине есть несколько действий, которые приводят к отправке сообщений в очередь DLQ из самой системы обработки сообщений. Сообщения также могут явно перемещаться в очередь DLQ самим приложением. К недоставленным сообщениям добавляются следующие два свойства (причина и описание недоставленных сообщений). Приложения могут определять собственные коды для свойства причины недоставленных сообщений, однако система задает следующие значения.

Причина недоставленных сообщений.Описание ошибки недоставленных сообщений.
HeaderSizeExceededПревышен максимальный размер для этого потока.
TTLExpiredExceptionВремя жизни сообщения истекло, и оно было перемещено в очередь недоставленных. Дополнительные сведения см. в разделе Срок жизни.
Идентификатор сеанса имеет значение NULL.Сущность, поддерживающая сеансы, не допускает использование сообщений, идентификатор сеанса которых имеет значение null.
MaxTransferHopCountExceededМаксимальное количество допустимых прыжков при пересылке между очередями. Этот параметр имеет значение 4.
MaxDeliveryCountExceededExceptionMessageСообщение не удалось обработать после максимально возможного количества попыток доставки. Дополнительные сведения см. в разделе Максимальное число доставок.

Максимальное число доставок

Существует ограничение на число попыток доставки сообщений для очередей и подписок Служебной шины. Значение по умолчанию — 10. Всякий раз, когда сообщение доставляется с блокировкой для просмотра, но затем от него явно отказываются или срок действия блокировки истекает, значение счетчика увеличивается. Если число доставок превышает это ограничение, сообщение перемещается в очередь DLQ. Причина недоставленного сообщения в очереди DLQ имеет значение MaxDeliveryCountExceeded. Это поведение нельзя отключить, но можно увеличить максимальное число попыток.

Срок жизни

При включении функции недоставленных сообщений для очередей или подписок все сообщения с истекшим сроком действия перемещаются в очередь DLQ. Код причины недоставленного сообщения имеет значение TTLExpiredException.

Отложенные сообщения не удаляются и не перемещаются в очередь недоставленных сообщений после истечения срока их действия. В этом весь замысел.

Ошибки при обработке правил подписки

Если включить недоставленные сообщения для исключений оценки фильтра, то все ошибки, возникающие во время выполнения правила фильтра SQL для подписки, записываются в очередь DLQ вместе с проблемным сообщением. Не используйте этот параметр в рабочей среде, в которой не все типы сообщений имеют подписчиков.

Недоставленные сообщения на уровне приложения

Помимо системных функций работы с недоставленными сообщениями, приложения могут использовать очередь DLQ для явного отклонения недопустимых сообщений. Сюда могут относиться сообщения, которые невозможно нормально обработать из-за каких-либо системных проблем, сообщения с полезными данными неправильного формата или сообщения, не прошедшие проверку подлинности при использовании какой-либо схемы безопасности на уровне сообщений.

Недоставка сообщений в сценариях ForwardTo или SendVia

Сообщения отправляются в очередь недоставленных сообщений при следующих условиях:

Путь к очереди недоставленных сообщений

Вы можете обратиться к очереди недоставленных сообщений с помощью следующего синтаксиса:

Дальнейшие действия

См. раздел Включение функции недоставленных сообщений для очереди или подписки, чтобы узнать о различных способах настройки помещения сообщения в очередь недоставленных после окончания срока действия.

Источник

Отложенные ретраи силами RabbitMQ

Меня зовут Алексей Казаков, я техлид команды Клиентских коммуникаций в ДомКлике. В этой статье я хочу поделиться с вами «рецептом», который позволил нам реализовать отложенные ретраи при использовании брокера сообщений RabbitMQ

Dead letter queue что это. Смотреть фото Dead letter queue что это. Смотреть картинку Dead letter queue что это. Картинка про Dead letter queue что это. Фото Dead letter queue что это

Введение

В ДомКлик существенная часть взаимодействия между сервисами реализована асинхронно за счет брокера сообщений RabbitMQ. Типичная схема взаимодействия выглядит так.

Dead letter queue что это. Смотреть фото Dead letter queue что это. Смотреть картинку Dead letter queue что это. Картинка про Dead letter queue что это. Фото Dead letter queue что это

Обычно сервису B нужны результаты выполнения опубликованных задач. Для этого создаётся обратный RabbitMQ-exchange, в который сервис A публикует результаты, а другие сервисы посредством RabbitMQ-routing_key получают только нужные им данные. Но для нашего «рецепта» это будет не нужно.

Отличные руководства по RabbitMQ можно найти на их сайте.

Постановка проблемы

Наша команда занимается доставкой всевозможных СМС/пушей/писем до клиентов, и для этих целей мы используем сторонних провайдеров, которые не входят в зону нашей ответственности. В общем случае схема выглядит так. Сервис A синхронно взаимодействует по HTTP с внешним сервисом E. Иногда сервис E может испытывать проблемы и не отвечать/таймаутить/пятисотить. Если несколько HTTP-ретраев с возрастающей задержкой не помогают и сервис E по-прежнему отказывается корректно работать, то что делать с сообщением?

RabbitMQ позволяет сделать reject with requeue, что вернет задачу в очередь и она не потеряется. Проблема заключается в том, что эта же задача очень быстро (

100 раз в секунду) снова попадет в consumer, и так мы будем порождать лишнюю нагрузку на сервис E (реальный случай из практики).

Возможные решения

1) Хранить сообщение в памяти приложения, продолжая ретраить.

2) С помощью механизма RabbitMQ-dead_letter_exchange сохранять задачи до лучших времен в отдельной очереди мертвых задач и считывать их оттуда отдельным consumer-ом.

3) Сохранять таски в базе, откуда снова доставлять их в consumer по истечении таймаута.

Выбранное нами решение

Последний вариант привлекателен тем, что тот же самый consumer будет заниматься обработкой задач. Вот бы ещё избавиться от необходимости работать с базой, ведь «Лучший код — не написанный код».

К счастью, можно реализовать механизм отложенных ретраев исключительно средствами RabbitMQ.

Ниже приведу схему очередей, описание маршрута задачи и минимальный код на Python, который позволит вам воспроизвести схему.

Dead letter queue что это. Смотреть фото Dead letter queue что это. Смотреть картинку Dead letter queue что это. Картинка про Dead letter queue что это. Фото Dead letter queue что это

Все элементы схемы уже описаны ранее за исключением пути от dead_letter_queue в service_a_inner_exch. Такая «петля» получается за счет того, что для dead_letter_queue в качестве dead letter exchange мы указываем service_a_inner_exch. В этом и заключается основная идея. Мы зацикливаем путь сообщения, отправляя его после таймаута из dead_letter_queue снова в исходный exchange.

Количество «кругов», которые проходит одна задача, можно ограничить с помощью анализа заголовков, которые изменяются при прохождении dead letter exchange. Это будет показано в примере кода ниже.

Код написан на Python 3.6.2 с использованием библиотеки pika==0.10.0.

Если в settings.py вы укажете необходимые данные для подключения к RabbitMQ, то последовательный запуск consumer.py и publisher.py выдаст следующий лог:

Т.е. код создаст схему, показанную на рисунке, отправит одно сообщение в систему, попытается трижды его обработать и отбросит после двух ретраев.

Возможные улучшения. Разные таймауты

В качестве расширения функциональности предложенной схемы можно рассмотреть создание нескольких dead letter queue с разными таймаутами. После прохождения через через dead letter exchange:

Возможные улучшения. Несколько consumer-ов

Если у вас с service_a_inner_exch связано несколько очередей, предназначенных для разных consumer-ов, то предложенная схема должна быть доработана. Например, у вас есть еще один сервис A_another, читающий из очереди service_a_another_input_q, связанной с service_a_inner_exch. Тогда текущая «петля» отправит сообщение повторно в обе очереди, и оба сервиса получат его повторно. Чтобы этого избежать, можно завести отдельный exchange dead_inner_exch, как показано на рисунке ниже.

Dead letter queue что это. Смотреть фото Dead letter queue что это. Смотреть картинку Dead letter queue что это. Картинка про Dead letter queue что это. Фото Dead letter queue что это

Заключение

Описанное решение позволяет реализовать произвольное количество отложенных ретраев с равными промежутками между попытками. Для этого требуется минимальный дополнительный код, а вся логика по задержке и повторной доставке выполняется кластером RabbitMQ.

Эта схема успешно эксплуатируется примерно 7 месяцев, неоднократно спасала при проблемах с сервисом E и ни разу не потребовала ручного вмешательства в свою работу. Условия эксплуатации: RabbitMQ 3.6.12, 4 RPS в среднем, с пиками до 40 RPS.

Надеюсь, эта статья поможет какому-нибудь программисту крепче спать по ночам.

Источник

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *