Elixir is a really awesome language which runs in the Erlang Virtual Machine (
BEAM
). So it takes advantage of all the concurrent and distribution features of Erlang by free.
In order to demonstrate it I have decided to develop a proof of concept (PoC) of a Publisher-Subscriber system only using Elixir.
In order to demonstrate it I have decided to develop a proof of concept (PoC) of a Publisher-Subscriber system only using Elixir.
Coding
I will begin building a Publisher. This module will be based on
GenServer
. It will take care of subscriptions, unsubscriptions and notifications. But we have to remember that Elixir as pure functional programming language does not maintain a state, how will be able to know who is subscribed?
Elixir provides a module called Agent which is prepared for this kind of issues. I will create a SubscriptionManager based on Agent in order to track all the subscriptions.
Elixir provides a module called Agent which is prepared for this kind of issues. I will create a SubscriptionManager based on Agent in order to track all the subscriptions.
defmodule SubscriptionManager do def start_link do Agent.start_link(fn -> %{} end) end def start do Agent.start(fn -> %{} end) end def get_all(agent) do Agent.get(agent, fn x -> x end) end def get(agent, key) do Agent.get(agent, fn x -> Map.get(x, key) end) end def put(agent, key, value) do Agent.update(agent, fn x -> Map.put(x, key, value) end) end def delete(agent, key) do Agent.get_and_update(agent, fn x -> Map.pop(x, key) end) end end
We can also improve this module using
ETS
but let's do it other time ;).
Once that we have a module on charge of the subscriptions, we can develop the Publisher module:
defmodule Publisher do use GenServer # Server def start do {:ok, subscription_manager} = SubscriptionManager.start GenServer.start(Publisher, subscription_manager) end def handle_cast({:subscribe, name, pid}, subscription_manager) do SubscriptionManager.put(subscription_manager, name, pid) {:noreply, subscription_manager} end def handle_cast({:unsubscribe, name}, subscription_manager) do SubscriptionManager.delete(subscription_manager, name) {:noreply, subscription_manager} end def handle_cast({:notify, message}, subscription_manager) do SubscriptionManager.get_all(subscription_manager) |> Enum.map(fn {key, value} -> GenEvent.notify(value, message) end) {:noreply, subscription_manager} end # Client def subscribe(server, name, pid) do GenServer.cast(server, {:subscribe, name, pid}) end def unsubscribe(server, name) do GenServer.cast(server, {:unsubscribe, name}) end def notify(server, message) do GenServer.cast(server, {:notify, message}) IO.puts "I have published: '#{message}'" end end
Then, we only need to develop our Subscriber module. It will be based on
GenEvent
and it will handle the received events in the easier way that we know:
Printing
.
defmodule Subscriber do use GenEvent def start do {:ok, pid} = GenEvent.start([]) GenEvent.add_handler(pid, Subscriber, []) {:ok, pid} end def handle_event(message, state) do IO.puts "I have received: '#{message}'" {:ok, state} end end
Testing
Erlang, and by extension Elixir, has the possibility of connecting Erlang Virtual Machines. I will use this capability within the same host but it will work with hosts that see each others.
I will run three BEAMs, one for the publisher and two for the subscribers.
I will run three BEAMs, one for the publisher and two for the subscribers.
Node 1: The Publisher
Open a console and type:
$ iex --sname node1 --cookie pubsub -S mix
Now that we have an IEX running type the following:
iex(node1@host)> {:ok, manager} = Publisher.start iex(node1@host)> :global.register_name('manager', manager)
I am registering the Publisher process globally using the name "manager" therefore any process from any connected BEAM will be able to find it.
Node 2: The Subscriber
Open another console and type:
iex --sname node2 --cookie pubsub -S mix
Now that we have an IEX running type the following:
iex(node2@host)> Node.connect :'node1@host' iex(node2@host)> manager = :global.whereis_name('manager') iex(node2@host)> {:ok, subscriber} = Subscriber.start iex(node2@host)> Publisher.subscribe(manager, 'subs_node2', subscriber)
Firstly we have to connect to the node1 and then using the :global module this process is able to subscribe to the Publisher that can be even in other host.
Node 3: The Subscriber
iex --sname node3 --cookie pubsub -S mix
And type in the IEX:
iex(node3@host)> Node.connect :'node1@host' iex(node3@host)> manager = :global.whereis_name('manager') iex(node3@host)> {:ok, subscriber} = Subscriber.start iex(node3@host)> Publisher.subscribe(manager, 'subs_node3', subscriber)
Publish a message:
Using whatever IEX (node1, node2 or node3) type the following:
Using whatever IEX (node1, node2 or node3) type the following:
iex> Publisher.notify(manager, "Hello !!")
The two subscriber will receive the sent message, "Hello !!" in this case.
Conclusion
As you have seen, it is really easy to code a distributed system using Elixir.
You can find the code in my github .
You can find the code in my github .