Пост

Пишем простой чат с консольным интерфейсом используя трубно-ориентированное программирование с котами

Если в процессе изучения gRPC хотите попрактиковаться с Bidirectional Streaming (или так называемый двунаправленная потоковая передача данных), c запросами в рамках одного соединения, инициированием событий со стороны сервера, то создание простенького чата может быть отличным способом.

Проект будем писать на языке Scala с использованием библиотеки fs2-grpc. Будем использовать клиент-серверную архитектуру, где клиенты могут отправлять сообщения на сервер, который будет ретранслировать их всем подключенным клиентам.

gRPC

Но прежде чем начать, давайте вспомним, что такое gRPC и как он связан с HTTP/2 не углубляясь в подробности (на эту тему и так достаточно статей).

gRPC - это RPC-фреймворк (Remote Procedure Call), который позволяет создавать клиент-серверные приложения для обмена данными. gRPC использует под капотом протокол HTTP/2, который позволяет ускорить передачу данных, уменьшить объем передаваемых данных и снизить задержку. Важно упомянуть о том, что gRPC использует Protobuf чтобы определить методы и структуру сообщений с помощью специального языка описания интерфейсов, а затем сгенерировать код для работы с этими сообщениями на различных языках программирования. Protobuf обеспечивает эффективную сериализацию/десериализацию данных в компактный бинарный формат.

Механизм работы чата

Ограничения

Во первых, чтобы не усложнять проект, я решил делать чат консольным и не хранить сообщения на стороне сервера.

Bidirectional Streaming

Поговорим про механизм работы чата с Bidirectional Streaming в gRPC. Процесс обмена сообщениями будет работать таким образом, что сервер и клиент обмениваются потоками сообщений в рамках одного соединения. Клиент отправляет событие, сервер его получает, обрабатывает, а затем отправляет ответное событие. Клиент и сервер обмениваются сообщениями асинхронно. Таким образом, данные передаются между клиентом и сервером в реальном времени и в обе стороны.

Мультикастинг событий внутри сервера

Когда клиенты подключаются к серверу, каждый из них может отправлять события на сервер. Однако, возникает проблема - как переслать сообщения от одного клиента остальным подключенным клиентам.

Для решения этой проблемы можно использовать механизм мультикастинга с помощью топика. Топик - это объект, который позволяет отправлять сообщения одновременно нескольким подписчикам. То есть, если один клиент отправляет событие, то полученное событие на стороне сервера будет направлено на этот топик, а оттуда автоматически пересылается всем клиентам, подписанным на этот топик.

Для реализации мультикастинга я использовал Topic из библиотеки fs2 (Functional Streams for Scala).

Таким образом, визуально механизм взаимодействия клиентов в сервером выглядит примерно так.

Клиент генерирует событие и отправляет на сервер, который, в свою очередь, раскидывает это событие по клиентам

Реализация

Для языка Scala есть несколько библиотек для работы с gRPC. Я использую fs2-grpc, который является оберткой над ScalaPB и сделана на основе функциональной библиотеки для работы со стримами - fs2.

fs2-grpc поддерживает все типы RPC-вызовов - Unary, Server Streaming, Client Streaming и Bidirectional Streaming. Она также предоставляет механизмы обработки ошибок и управления ресурсами, такие как Resource и Bracket. fs2-grpc интегрируется со стеком функциональных библиотек для работы с эффектами (cats-effect, zio, monix). В моем примере используется Cats Effect 3.

Proto

И так, приступим. В первую очередь нужно накидать прото-файл, в котором опишем контракт взаимодействия клиента и сервера.

Создадим некоторый ChatService с методом eventsStream, у которого на входе и на выходе потоковые данные с типом Events (то есть будем события через стримы туда-сюда делать).

1
2
3
4
service ChatService {

  rpc eventsStream(stream Events) returns (stream Events) { }
}

Events содержит данные обернутые в тип события, которые могут быть инициированы как на стороне клиента, так и сервера (в нашем случае только на стороне клиентов).

1
2
3
4
5
6
7
8
message Events {

  oneof event {
    Login    client_login    = 1;
    Logout   client_logout   = 2;
    Message  client_message  = 3;
    Shutdown server_shutdown = 4;
  }

Реализация сервера

Ранее мы говорили, что сервер должен получать события от клиентов и транслировать их остальным клиентам.

После компиляции прото-файла будет сгенерирован базовый код для работы с gRPC, среди которого будет интерфейс ChatServiceFs2Grpc. Он должен быть имплементирован на стороне сервера. Моя реализация имеет следующий вид.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
object ChatService {

  def apply[F[_]: Concurrent: Console](
      eventsTopic: Topic[F, Events]
  ): ChatServiceFs2Grpc[F, Metadata] = new ChatServiceFs2Grpc[F, Metadata] {

    val eventsToClients: Stream[F, Events] =
      eventsTopic
        .subscribeUnbounded
        .evalTap(event => Console[F].println(s"From topic: $event"))

    override def eventsStream(
        eventsFromClient: fs2.Stream[F, Events],
        ctx: Metadata
    ): fs2.Stream[F, Events] = {
      eventsToClients.concurrently(
        eventsFromClient
          .evalTap(event => Console[F].println(s"Event from client: $event"))
          .evalMap(eventsTopic.publish1)
      )
    }
  }
}

Мы видим метод eventsStream, который описывали в proto-файле. Из потока eventsFromClient получаем события от клиентов. На выходе отдаем некоторый поток событий eventsToClients. Если посмотреть выше, то видим, что eventsToClients это подписка на топик eventsTopic: Topic[F, Events], в который публикуются события от клиента для отправки остальным клиентам.

Сборка и запуск сервера

Собираем все компоненты, которые представляют собой основу серверного приложения.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
object ChatServerApp extends IOApp {

  private def runServer(service: ServerServiceDefinition): IO[Nothing] = {
    NettyServerBuilder
      .forPort(50053)
      .keepAliveTime(5, TimeUnit.SECONDS)
      .addService(service)
      .resource[IO]
      .evalMap(server => IO(server.start()))
      .useForever
  }

  override def run(args: List[String]): IO[ExitCode] = for {
    topic <- Topic[IO, Events]
    serviceResource: Resource[IO, ServerServiceDefinition] =
      ChatServiceFs2Grpc.bindServiceResource[IO](ChatService(topic))
    _ <- serviceResource.use(runServer)
  } yield ExitCode.Success
}

В функции runServer создается и запускается новый сервер с помощью NettyServerBuilder, который прослушивает порт 50053. NettyServerBuilder предоставляется библиотекой gRPC для создания серверов, использующих Netty в качестве транспорта и позволяет настроить параметры сервера (порт, keepAliveTime и т.д.)

В методе run создается топик, который будет использоваться для мультикастинга событий по клиентам. Создаем инстанс сервиса ChatService и биндим его к серверу. Затем запускаем наш сервер.

1
$ sbt "runMain org.github.ainr.chat.server.ChatServerApp"

В итоге, когда сервер запущен, клиенты смогут подключаться к нему, отправлять сообщения и получать их в режиме реального времени.

Реализация клиента

Что должен делать клиент? Клиент может показаться чутка сложнее, но на самом деле тут тоже все просто. Клиент делает несколько простых вещей:

  • Читает ввод в консоль
  • Отправляет события серверу
  • Получает события от сервера и обрабатывает их
  • Печатает полученные сообщения в консоль

Со стороны клиента тоже все сделано на стримах (Stream).

Чтение ввода из консоли

Для чтения ввода из консоли снова прибегаем к помощи стримов. Создаем класс InputStream с методом read, который возвращает поток сообщений напечатанных клиентом - Stream[F, String].

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
object InputStream {

  def apply[F[_]: Async: Console](bufSize: Int): InputStream[F] = {

    new InputStream[F] {

      override def read: Stream[F, String] = {
        fs2.io
          .stdinUtf8(bufSize)
          .through(fs2.text.lines)
          .evalTap(erase)          // удалить из консоли ввод
          .filter(_.nonEmpty)      // фильтруем пустые строки
      }

      private def erase: PartialFunction[String, F[Unit]] = {
        _ => Console[F].print("\u001b[1A\u001b[0K")  // удаляет то, что мы напечатали в консоль путем ввода спец-символов
      }
    }
  }
}

По коду видно, что он берет поток символов, преобразует их в строки и фильтрует пустые. Магическим может показаться только лишь метод erase, который печатает что-то непонятное в консоль.

На самом деле никакой магии нет. Все, что он делает - это удаляет то, что мы напечатали в консоль путем ввода спец-символов ANSI чтобы сообщения не дублировались.

Логика клиента

Далее введенный пользователем в консоль текст нужно преобразовать в тип события Event и отправить серверу.

В целом, логика клиента довольно простая и описана путем композиции стримов в методе start. Здесь снова фигурирует chatService: ChatServiceFs2Grpc[F, Metadata] с методом eventsStream сгенерированный библиотекой fs2-grpc на вход которого отправляем события из консоли (InputStream), генерируемые пользователем.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
object ChatClient {

  def apply[F[_]: Concurrent: Console](
      clientName: String,
      inputStream: InputStream[F],
      chatService: ChatServiceFs2Grpc[F, Metadata]
  ): ChatClient[F] = new ChatClient[F] {

    private val grpcMetaData = new Metadata() // empty

    override def start: F[Unit] = {
      chatService
        .eventsStream(
          login(clientName) ++ inputStream.read.through(handleInput),
          grpcMetaData
        )
        .through(processEvent)  // обрабатываем полученные события от сервера
        .through(writeToConsole)   // пишем в консоль
        .compile
        .drain
    }

private def login(clientName: String): fs2.Stream[F, Events] =
  fs2.Stream(Events(ClientLogin(Login(clientName))))

    // ...

Metadata в gRPC - это способ передачи дополнительных метаданных между клиентом и сервером, которые представляет собой пары ключ-значение и могут быть добавлены к любому запросу.

На выходе eventsStream ловим события с сервера, сгенерированные другими клиентами, обрабатываем их методом processEvent, который преобразовывает события в строки.

1
2
3
4
5
6
7
8
private def processEvent: Pipe[F, Events, String] =
  _.map { data =>
    data.event match {
      case event: ClientLogin   => s"${Color.Green(event.value.name).overlay(Bold.On)} entered the chat."      case event: ClientLogout  => s"${Color.Blue(event.value.name).overlay(Bold.On)} left the chat."      case event: ClientMessage => s"${Color.LightGray(s"${event.value.name}:").overlay(Bold.On)} ${event.value.message}"
      case _: ServerShutdown    => s"${Color.LightRed("Server shutdown")}"
      case unknown              => s"${Color.Red("Unknown event:")} $unknown"
    }
  }

Для форматированного вывода текста в консоли используется библиотека fansi от lihaoyi, предназначенная для работы с цветами и стилями текста в консольном приложении. Она позволяет добавлять цветовые и стилевые эффекты к тексту, что делает консольный вывод более информативным и привлекательным. Далее сообщения будут напечатаны в консоль методом writeToConsole.

Сборка и запуск клиента

Собираем все компоненты, которые представляет собой основу клиентского приложения.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
object ChatClientApp extends IOApp {

  private def buildChatService(channel: Channel): Resource[IO, ChatServiceFs2Grpc[IO, Metadata]] =
    ChatServiceFs2Grpc.stubResource[IO](channel)

  private def resources: Resource[IO, ChatServiceFs2Grpc[IO, Metadata]] =
    NettyChannelBuilder
      .forAddress("127.0.0.1", 50053)
      .usePlaintext()
      .resource[IO]
      .flatMap(buildChatService)

  override def run(args: List[String]): IO[ExitCode] =
    resources.use { chatServiceFs2Grpc =>
      ChatClient(
        args.headOption.getOrElse("Anonymous"),
        InputStream[IO](bufSize = 1024),
        chatServiceFs2Grpc
      ).start
    }.as(ExitCode.Success)
}

NettyChannelBuilder - это класс, предоставляемый библиотекой gRPC для создания клиентов, использующих Netty в качестве транспорта. Он позволяет настроить параметры клиента.

В функции buildChatService создается ресурс, который представляет собой клиент для обращения к серверу чата. Для его создания используется метод stubResource из ChatServiceFs2Grpc.

Запускаем клиент через sbt, передав в аргументы имя клиента.

1
$ sbt "runMain org.github.ainr.chat.client.ChatClientApp Username"

И можем общаться :)

Вместо заключения

Создание небольших, простых проектов - это отличный способ попрактиковаться и углубить свои знания в технологиях. Это может быть что-то, что вы можете написать быстро и без особых усилий, но в то же время дает возможность изучить какой-то новый аспект технологии или языка программирования.

Простые проекты могут быть очень разнообразными. Например, вы можете написать небольшой веб-сервер, создать небольшую игру, написать скрипт для автоматического сбора данных, или же написать чат на базе gRPC, как мы обсуждали ранее.

Преимущество создания небольших проектов заключается в том, что вы можете более глубоко изучить технологию и применить знания на практике. Вы также можете быстро увидеть результат своей работы и получить удовлетворение от завершения проекта.

Не бойтесь начинать с чего-то простого и постепенно увеличивать сложность - это поможет вам стать более опытным и уверенным программистом.

Исходники

Код проекта можно посмотреть на гитхабе - https://github.com/a-khakimov/simple-fs2-grpc-chat.

Авторский пост защищен лицензией CC BY 4.0 .