Как подключить Cloudflare

Этот пост также доступен на испанском языке.

Анонс Pub/Sub: программируемый обмен сообщениями на основе MQTT

Один из основных вопросов, который движет Platform Week, — «Как мы позволяем разработчикам создавать приложения с полным стеком на Cloudflare?». Используя Workers в качестве бессерверной среды для простого развертывания распределенных по умолчанию приложений, KV и Durable Objects для кэширования и координации, а также R2 в качестве нашего хранилища объектов с нулевой стоимостью исходящего трафика, мы продолжили обсуждать, что еще нам нужно создать, чтобы помочь разработчики создают новые приложения и/или переносят существующие на платформу разработчиков Cloudflare.

Имея это в виду, мы рады объявить о закрытой бета-версии Cloudflare Pub/Sub, программируемый шина сообщений, построенная на повсеместно распространенном и стандартном для отрасли протоколе MQTT, поддерживаемом сегодня десятками миллионов существующих устройств.

Вкратце, Pub/Sub позволяет вам:

И, вероятно, есть длинный список вещей, которые мы еще даже не предсказали. Мы видели, как разработчики создают невероятные вещи на основе Cloudflare Workers, и мы рады видеть, что они создают с помощью программируемой шины сообщений, такой как Pub/Sub.

Почему и что такое MQTT?

Если вы раньше не слышали о MQTT, вы можете быть удивлены, узнав, что это один из самых распространенных «протоколов обмена сообщениями», развернутых сегодня. Сегодня существуют десятки миллионов (как минимум!) устройств, говорящих на MQTT, от подключенных платежных терминалов до автономных транспортных средств, сотовых телефонов и даже видеоигр. Показания датчиков, телеметрия, финансовые транзакции и/или мобильные уведомления и сообщения — все это распространенные варианты использования MQTT, а гибкость протокола позволяет разработчикам находить компромиссы в отношении надежности, иерархии тем и постоянства в зависимости от их варианта использования.

Мы выбрали MQTT в качестве основы для Cloudflare Pub/Sub, поскольку мы верим в построение на основе открытых и доступных стандартов, как мы сделали, когда выбрали Service Worker API в качестве основы для Workers, и с нашим недавно объявленным участием в зимнем сообществе. Сгруппируйте вокруг API среды выполнения на стороне сервера. Мы также хотели предоставить существующим клиентам простой способ воспользоваться преимуществами масштабируемости и программируемости Cloudflare, а также гарантировать, что у разработчиков есть богатая экосистема клиентских библиотек на языках, с которыми они знакомы сегодня.

Однако помимо этого мы также считаем, что MQTT отвечает потребностям современной службы обмена сообщениями «публикация-подписка». Он имеет гибкие гарантии доставки, TLS для транспортного шифрования (никакого специального шифрования!), масштабируемую модель создания тем и подписки, расширяемые метаданные для каждого сообщения и, что важно, предоставляет четко определенную спецификацию с четкими сообщениями об ошибках.

Имея это в виду, мы ожидаем, что будет поддерживать гораздо больше «входов» в Pub/Sub: многие лучшие части MQTT могут быть абстрагированы от клиентов, которые могут захотеть общаться с нами через HTTP или WebSockets.

Строительные блоки

Учитывая возможность написания кода, который воздействует на каждое сообщение, опубликованное в Pub/Sub Broker, как это выглядит на практике?

Вот простой, но наглядный пример обработки сообщений Pub/Sub непосредственно в Worker. У нас есть клиенты (в данном случае платежные терминалы), которые сообщают данные о транзакциях, и мы хотим зафиксировать количество транзакций, обработанных в каждом регионе, чтобы мы могли отслеживать объемы транзакций с течением времени.

В частности, мы:

  1. Фильтровать по определенному префиксу темы сообщения, которые нам интересны
  2. Проанализируйте сообщение для определенной пары ключ: значение в качестве метрики
  3. Запишите эту метрику непосредственно в Workers Analytics Engine, нашу новую бессерверную службу аналитики временных рядов, чтобы мы могли напрямую запрашивать ее с помощью GraphQL.

Это избавляет нас от необходимости создавать и поддерживать внешнюю службу метрик, настраивать другую облачную службу или думать о том, как она будет масштабироваться: мы можем делать все это прямо в Cloudflare.


async function pubsub(
  messages: Array<PubSubMessage>,
  env: any,
  ctx: ExecutionContext
): Promise<Array<PubSubMessage>> {
  
  for (let msg of messages) {
    // Extract a value from the payload and write it to Analytics Engine
    // In this example, a transactionsProcessed counter that our clients are sending
    // back to us.
    if (msg.topic.startsWith(“/transactions/”)) {
      // This is non-blocking, and doesn’t hold up our message
      // processing.
      env.TELEMETRY.writeDataPoint({
        // We label this metric so that we can query against these labels
        labels: [`${msg.broker}.${msg.namespace}`, msg.payload.region, msg.payload.merchantId],
        metrics: [msg.payload.transactionsProcessed ?? 0]
      });
    }
  }

  // Return our messages back to the Broker
  return messages;
}

const worker = {
  async fetch(req: Request, env: any, ctx: ExecutionContext) {
    // Critical: you must validate the incoming request is from your Broker
    // In the future, Workers will be able to do this on your behalf for Workers
    // in the same account as your Pub/Sub Broker.
    if (await isValidBrokerRequest(req)) {

      // Parse the incoming PubSub messages
      let incomingMessages: Array<PubSubMessage> = await req.json();
      
      // Pass the message to our pubsub handler, and capture the returned
      // messages
      let outgoingMessages = await pubsub(incomingMessages, env, ctx);

      // Re-serialize the messages and return a HTTP 200 so our Broker
      // knows we’ve successfully handled them
      return new Response(JSON.stringify(outgoingMessages), { status: 200 });
    }

    return new Response("not a valid Broker request", { status: 403 });
  },
};

export default worker;

Затем мы можем запросить эти показатели напрямую, используя знакомый язык: SQL. Наш запрос использует записанные нами метрики и дает нам разбивку транзакций, обработанных нашими платежными устройствами, сгруппированных по продавцам (и опять же, все на Cloudflare):

SELECT
  label_2 as region,
  label_3 as merchantId,
  sum(metric_1) as total_transactions
FROM TELEMETRY
WHERE
  metric_1 > 0
  AND timestamp >= now() - 604800
GROUP BY
  region,
  merchantId
ORDER BY
  total_transactions DESC
LIMIT 10

Вы можете заменить или дополнить вызовы Analytics Engine любым количеством примеров:

Pub/Sub дает вам возможность получать данные в сеть Cloudflare, фильтровать, агрегировать и/или видоизменять их, а также отправлять обратно подписчикам — независимо от того, слушают ли они 10, 1000 или 10 000 человек по этой теме.

Куда мы направляемся?

Как мы часто любим говорить: мы только начинаем. Закрытая бета-версия Pub/Sub — это только начало нашего пути, и у нас есть длинный список возможностей, над которыми мы уже работаем.

Крайне важно, что один из наших приоритетов — охватить как можно больше спецификации MQTT v5.0, чтобы клиенты могли перенести существующие развертывания и заставить их «просто работать». Полезные возможности, такие как общие подписки, которые позволяют распределять нагрузку сообщений между многими подписчиками; групповые подписки (как одноуровневые, так и многоуровневые) для вариантов использования агрегации, более надежные гарантии доставки (QoS) и поддержка дополнительных режимов аутентификации (в частности, Mutual TLS) — это лишь некоторые из вещей, над которыми мы работаем.

Кроме того, мы сосредоточены на том, чтобы разработчикам было максимально удобно использовать Pub/Sub, и во время бета-тестирования мы будем:

Наша документация для разработчиков будет охватывать эти возможности по мере их появления.

Мы также понимаем, что ценообразование является важной частью опыта разработчиков, и стремимся обеспечить доступный и гибкий уровень бесплатного пользования. Мы хотим дать разработчикам возможность экспериментировать, создавать прототипы и решать проблемы, о которых мы еще не думали. Мы расскажем больше о ценах в ходе бета-тестирования.

Начиная

Если вы хотите начать использовать Pub/Sub, подпишитесь на закрытую бета-версию: мы планируем начать предоставление доступа в течение следующего месяца. Мы с нетерпением ждем отзывов от разработчиков и того, что они начинают создавать.

А пока ознакомьтесь с совершенно новой документацией для разработчиков Pub/Sub, чтобы понять, как Pub/Sub работает внутри, как работает протокол MQTT и как он интегрируется с Cloudflare Workers.

Что такое Cloudflare