← All posts

Add ChatRoomServer - sdball/protohackers

Commit bdcfbfc from github.com/sdball/protohackers

https://protohackers.com/problem/3

Protohackers problem 3 was a fun one!

I opted to use Elixir’s Registry to hold the state of joined users. That means I was able to use Elixir process messages to send server and chat messages directly to each connected client process. Each client process was then simply responsible for delivering its own messages to its own connected socket.

The way I used the Registry also means I’m just a little ways away from supporting chat rooms. Right now all clients connect to a hardcoded “general” room. But with a few tweaks users could join and leave rooms while staying connected to the server itself.

One interesting piece is how I’m handling the chat input loop in receive_input/1. While a user is joined their process is constantly looping between reciving incoming chat messages from their TCP socket and receving Elixir process messages with 100ms timeout on each. If we get a TCP message: send it as a chat message and loop. If we get a process message: send it to their socket and loop.

bdcfbfc369296f1bd29ec39dae89d708e003e41f on sdball/protohackers

added files

lib/protohackers/chat_room_server.ex

defmodule Protohackers.ChatRoomServer do
  @moduledoc """
  TCP-based chat room server implementation

  Each message is a single line of ASCII text terminated by a newline character
  ('\n', or ASCII 10).

  Clients can send multiple messages per connection.

  All messages are raw ASCII text, not wrapped up in JSON or any other format.

  ## Joining the chat as a new user

  When a client connects to the server, it does not yet have a name and is not
  considered to have joined.

  The server will prompt the user by sending a single message asking for a name.

      Welcome to C.H.A.T. What name would you like?

  The first message from a client sets the user's name, which must contain
  at least 1 character, and must consist entirely of alphanumeric characters
  (uppercase, lowercase, and digits).

  Once the user has a name, they have joined the chat room and the server will
  announce their presence to other users.

  The server will send the new user a message that lists all present users'
  names, not including the new user, and not including any users who have
  already left. The wording of this message may vary but will lead with an
  asterisk `*` character.

      * The chat room: susan, mark, dave, amy

  The room list will be sent even if the room is empty.

  All subsequent messages from the server will be chat messages originating from
  other connected users.

  ## Chat messages

  After the naming/joining process is complete when a client sends a message to
  the server it will be a chat message.

  The server will relay chat messages to all other connected clients in the
  following format:

      [name] message

  The message sender will NOT get an echoed copy of their own message.

  ## Notification of new user joins

  When a user joins the chat room all other joined users will receive a message
  from the server informing them a new user has joined.

  The new user joined message wording will vary but will lead with an asterisk
  `*` character and contain the user's name.

      * alice has joined the room

  ## Notification of joined user departures

  When a joined user is disconnected for any reason other joined users will be
  notified of their departure.

  The user left message wording will vary but will lead with an asterisk `*`
  character and contain the user's name.

      * alice has left the room
  """

  use GenServer

  require Logger

  def start_link(port \\ 11238) do
    GenServer.start_link(__MODULE__, port)
  end

  defstruct [:listen_socket, :supervisor]

  @impl true
  def init(port) do
    {:ok, supervisor} = Task.Supervisor.start_link(max_children: 100)

    {:ok, _pid} =
      Registry.start_link(
        keys: :duplicate,
        name: Registry.ChatRoom,
        partitions: System.schedulers_online()
      )

    listen_options = [
      # receive data as binaries (instead of lists)
      mode: :binary,
      # explicitly retrieve packets by calling `:gen_tcp.recv/2`
      # (by default incoming packets would be sent as messages)
      active: false,
      # allow reusing the address if the listener crashes
      reuseaddr: true,
      # keep the peer socket open after the client closes its writes
      exit_on_close: false,
      # automatically split inputs by newline
      packet: :line,
      # increase default buffer to 10KB
      buffer: 1024 * 10
    ]

    with {:ok, listen_socket} <- :gen_tcp.listen(port, listen_options) do
      Logger.info("Starting ChatRoomServer on port #{port}")

      state = %__MODULE__{
        listen_socket: listen_socket,
        supervisor: supervisor
      }

      {:ok, state, {:continue, :accept}}
    else
      {:error, reason} ->
        {:stop, reason}
    end
  end

  @impl true
  def handle_continue(:accept, %__MODULE__{} = state) do
    case :gen_tcp.accept(state.listen_socket) do
      {:ok, socket} ->
        Task.Supervisor.start_child(state.supervisor, fn ->
          handle_connection(socket)
        end)

        {:noreply, state, {:continue, :accept}}

      {:error, reason} ->
        Logger.error("[ChatRoomServer] Unable to accept connection #{inspect(reason)}")
        {:stop, reason, state}
    end
  end

  # -- core ------------------------------------------------------------------
  def broadcast_user_joined(username) do
    Registry.dispatch(Registry.ChatRoom, "general", fn users ->
      users
      |> Enum.reject(fn {_pid, name} ->
        name == username
      end)
      |> Enum.each(fn {pid, _name} ->
        send(pid, {:broadcast_user_joined, username})
      end)
    end)

    Logger.info("#{username} joined")
  end

  def list_previously_joined_users(socket, username) do
    other_users =
      Registry.lookup(Registry.ChatRoom, "general")
      |> Enum.map(&elem(&1, 1))
      |> Enum.reject(&(&1 == username))
      |> Enum.sort()
      |> Enum.join(", ")

    if other_users == "" do
      :gen_tcp.send(socket, "* You have joined the room\n")
    else
      :gen_tcp.send(socket, "* You have joined the room with: #{other_users}\n")
    end
  end

  def broadcast_user_left(username) do
    Registry.dispatch(Registry.ChatRoom, "general", fn users ->
      users
      |> Enum.reject(fn {_pid, name} ->
        name == username
      end)
      |> Enum.each(fn {pid, _name} ->
        send(pid, {:broadcast_user_left, username})
      end)
    end)

    Logger.info("#{username} left")
  end

  def chat_room(socket, username) do
    with {:ok, message} <- receive_input(socket) do
      user_message(username, message)
      chat_room(socket, username)
    else
      {:error, :unprintable} ->
        chat_room(socket, username)

      error ->
        error
    end
  end

  def user_message(username, message) do
    Registry.dispatch(Registry.ChatRoom, "general", fn users ->
      users
      |> Enum.reject(fn {_pid, name} ->
        name == username
      end)
      |> Enum.each(fn {pid, _name} ->
        send(pid, {:user_message, username, message})
      end)
    end)
  end

  def check_name(name) do
    with {:ok, name} <- check_name_format(name),
         {:ok, name} <- check_available_name(name) do
      {:ok, name}
    else
      error ->
        error
    end
  end

  def check_name_format(name) when byte_size(name) > 0 and byte_size(name) < 30 do
    if Regex.match?(~r|^[a-zA-Z0-9]+$|, name) do
      {:ok, name}
    else
      {:error, :rejected_name}
    end
  end

  def check_name_format(_name), do: {:error, :rejected_name}

  def check_available_name(name) do
    Registry.lookup(Registry.ChatRoom, "general")
    |> Enum.map(&elem(&1, 1))
    |> Enum.into(MapSet.new())
    |> MapSet.member?(name)
    |> case do
      true ->
        {:error, :name_already_taken}

      false ->
        {:ok, name}
    end
  end

  # -- server ----------------------------------------------------------------

  def handle_connection(socket) do
    case join_chat(socket) do
      {:ok, username} ->
        broadcast_user_joined(username)
        list_previously_joined_users(socket, username)
        chat_room(socket, username)
        broadcast_user_left(username)

      {:error, reason} ->
        Logger.error("[ChatRoomServer] failed to join #{inspect(reason)}")
    end

    :gen_tcp.close(socket)
  end

  def join_chat(socket) do
    :gen_tcp.send(socket, "Welcome! What name would you like?\n")

    with {:ok, name} <- receive_input(socket),
         {:ok, name} <- check_name(name),
         {:ok, _pid} <- Registry.register(Registry.ChatRoom, "general", name) do
      {:ok, name}
    else
      {:error, reason = :rejected_name} ->
        :gen_tcp.send(socket, "Sorry that is an rejected name\n")
        {:error, reason}

      {:error, reason = :name_already_taken} ->
        :gen_tcp.send(socket, "Sorry that name is already taken\n")
        {:error, reason}

      error ->
        error
    end
  end

  def receive_input(socket) do
    with {:ok, input} <- :gen_tcp.recv(socket, 0, 100),
         true <- String.printable?(input) do
      {:ok, String.replace(input, ~r|\s*$|, "")}
    else
      false ->
        {:error, :unprintable}

      {:error, :timeout} ->
        receive do
          {:broadcast_user_left, username} ->
            :gen_tcp.send(socket, "* #{username} left\n")
            receive_input(socket)

          {:broadcast_user_joined, username} ->
            :gen_tcp.send(socket, "* #{username} joined\n")
            receive_input(socket)

          {:user_message, username, message} ->
            :gen_tcp.send(socket, "[#{username}] #{message}\n")
            receive_input(socket)
        after
          100 ->
            receive_input(socket)
        end

      error ->
        error
    end
  end
end

test/protohackers/chat_room_server_test.exs

defmodule Protohackers.ChatRoomServerTest do
  alias Protohackers.ChatRoomServer
  use ExUnit.Case

  describe "integration" do
    test "connecting users are prompted for a name" do
      {:ok, socket} = :gen_tcp.connect('localhost', 11238, [:binary, active: false])
      {:ok, response} = :gen_tcp.recv(socket, 0)
      assert String.contains?(response, "name")
    end

    test "users can connect with an accepted name" do
      user = "stephen"
      {:ok, socket} = :gen_tcp.connect('localhost', 11238, [:binary, active: false])
      {:ok, _response} = :gen_tcp.recv(socket, 0)
      :ok = :gen_tcp.send(socket, "#{user}\n")
      {:ok, response} = :gen_tcp.recv(socket, 0)
      assert String.starts_with?(response, "*")
      assert String.contains?(response, "joined the room")
    end

    test "users are shown the existing members of the room not including their user" do
      user1 = "stephen"
      {:ok, socket1} = :gen_tcp.connect('localhost', 11238, [:binary, active: false])
      {:ok, _response} = :gen_tcp.recv(socket1, 0)
      :ok = :gen_tcp.send(socket1, "#{user1}\n")
      {:ok, response} = :gen_tcp.recv(socket1, 0)
      assert String.starts_with?(response, "*")
      assert String.contains?(response, "joined the room")
      assert !String.contains?(response, user1)

      user2 = "alanone"
      {:ok, socket2} = :gen_tcp.connect('localhost', 11238, [:binary, active: false])
      {:ok, _response} = :gen_tcp.recv(socket2, 0)
      :ok = :gen_tcp.send(socket2, "#{user2}\n")
      {:ok, response} = :gen_tcp.recv(socket2, 0)
      assert String.starts_with?(response, "*")
      assert String.contains?(response, "joined the room")
      assert String.contains?(response, user1)
      assert !String.contains?(response, user2)
    end

    test "already joined users in the room are sent a notice for new users" do
      user1 = "stephen"
      {:ok, socket1} = :gen_tcp.connect('localhost', 11238, [:binary, active: false])
      :gen_tcp.recv(socket1, 0, 300)
      :gen_tcp.send(socket1, "#{user1}\n")
      :gen_tcp.recv(socket1, 0, 300)

      user2 = "alanone"
      {:ok, socket2} = :gen_tcp.connect('localhost', 11238, [:binary, active: false])
      :gen_tcp.recv(socket2, 0, 300)
      :gen_tcp.send(socket2, "#{user2}\n")
      :gen_tcp.recv(socket2, 0, 300)

      {:ok, message} = :gen_tcp.recv(socket1, 0, 300)
      assert message == "* #{user2} joined\n"

      # no corresponding message on user2's session
      {:error, :timeout} = :gen_tcp.recv(socket2, 0, 300)

      :gen_tcp.close(socket1)
      :gen_tcp.close(socket2)
    end

    test "users joining a room with existing users are shown the existing users" do
      user1 = "stephen"
      {:ok, socket1} = :gen_tcp.connect('localhost', 11238, [:binary, active: false])
      :gen_tcp.recv(socket1, 0, 300)
      :gen_tcp.send(socket1, "#{user1}\n")
      :gen_tcp.recv(socket1, 0, 300)

      user2 = "alanone"
      {:ok, socket2} = :gen_tcp.connect('localhost', 11238, [:binary, active: false])
      :gen_tcp.recv(socket2, 0, 300)
      :gen_tcp.send(socket2, "#{user2}\n")
      {:ok, join_response} = :gen_tcp.recv(socket2, 0, 300)
      assert join_response == "* You have joined the room with: #{user1}\n"

      :gen_tcp.close(socket1)
      :gen_tcp.close(socket2)
    end

    test "users can chat" do
      user1 = "stephen"
      {:ok, socket1} = :gen_tcp.connect('localhost', 11238, [:binary, active: false])
      :gen_tcp.recv(socket1, 0, 300)
      :gen_tcp.send(socket1, "#{user1}\n")
      :gen_tcp.recv(socket1, 0, 300)

      user2 = "alanone"
      {:ok, socket2} = :gen_tcp.connect('localhost', 11238, [:binary, active: false])
      :gen_tcp.recv(socket2, 0, 300)
      :gen_tcp.send(socket2, "#{user2}\n")
      :gen_tcp.recv(socket2, 0, 300)

      {:ok, _joined_message} = :gen_tcp.recv(socket1, 0, 300)

      # chat begins

      # message from user1
      :ok = :gen_tcp.send(socket1, "I think the MCP is getting out of hand\n")

      # appears for user2
      {:ok, received} = :gen_tcp.recv(socket2, 0, 300)
      assert received == "[#{user1}] I think the MCP is getting out of hand\n"

      # and not for user1
      {:error, :timeout} = :gen_tcp.recv(socket1, 0, 300)

      # message from user2
      message = "Don't worry, TRON will run independently. And watchdog the MCP as well.\n"
      :ok = :gen_tcp.send(socket2, message)

      # appears for user1
      {:ok, received} = :gen_tcp.recv(socket1, 0, 300)
      assert received == "[#{user2}] #{message}"

      # and not for user2
      {:error, :timeout} = :gen_tcp.recv(socket2, 0, 300)

      :gen_tcp.close(socket1)
      :gen_tcp.close(socket2)
    end

    test "when a user leaves other users are notified" do
      user1 = "stephen"
      {:ok, socket1} = :gen_tcp.connect('localhost', 11238, [:binary, active: false])
      :gen_tcp.recv(socket1, 0, 300)
      :gen_tcp.send(socket1, "#{user1}\n")
      :gen_tcp.recv(socket1, 0, 300)

      user2 = "alanone"
      {:ok, socket2} = :gen_tcp.connect('localhost', 11238, [:binary, active: false])
      :gen_tcp.recv(socket2, 0, 300)
      :gen_tcp.send(socket2, "#{user2}\n")
      :gen_tcp.recv(socket2, 0, 300)

      {:ok, message} = :gen_tcp.recv(socket1, 0, 300)
      assert message == "* #{user2} joined\n"

      :gen_tcp.close(socket2)

      {:ok, message} = :gen_tcp.recv(socket1, 0, 300)
      assert message == "* #{user2} left\n"
    end
  end

  describe "unit" do
    test "alphanumeric names are accepted" do
      accepted = ~w(someone word123 SOMEONE SomeOne a b c some0ne)

      Enum.all?(accepted, fn name ->
        {:ok, ^name} = ChatRoomServer.check_name_format(name)
      end)
    end

    test "non-alphanumeric names are rejected" do
      rejected = [
        "someone!",
        "hello there",
        "@someone"
      ]

      Enum.all?(rejected, fn name ->
        {:error, :rejected_name} = ChatRoomServer.check_name_format(name)
      end)
    end
  end
end

modified files

fly.toml

diff --git a/fly.toml b/fly.toml
index f1dad4a..0463b7f 100644
--- a/fly.toml
+++ b/fly.toml
@@ -46,6 +46,17 @@ script_checks = []
 handlers = []
 port = "11237"

+[[services]]
+http_checks = []
+internal_port = 11238
+processes = ["app"]
+protocol = "tcp"
+script_checks = []
+
+[[services.ports]]
+handlers = []
+port = "11238"
+
 [services.concurrency]
 hard_limit = 25
 soft_limit = 20

lib/protohackers/application.ex

diff --git a/lib/protohackers/application.ex b/lib/protohackers/application.ex
index f38e8a6..ca1ba1b 100644
--- a/lib/protohackers/application.ex
+++ b/lib/protohackers/application.ex
@@ -8,7 +8,8 @@ defmodule Protohackers.Application do
     children = [
       {Protohackers.EchoServer, 11235},
       {Protohackers.IsPrimeServer, 11236},
-      {Protohackers.AssetPriceServer, 11237}
+      {Protohackers.AssetPriceServer, 11237},
+      {Protohackers.ChatRoomServer, 11238}
     ]

     opts = [strategy: :one_for_one, name: Protohackers.Supervisor]

Commit bdcfbfc from github.com/sdball/protohackers