Picture
Elixir is a really awesome language which runs in the Erlang Virtual Machine ( BEAM ). So it takes advantage of all the concurrent and distribution features of Erlang by free.
In order to demonstrate it I have decided to develop a proof of concept (PoC) of a Publisher-Subscriber system only using Elixir.




Coding

I will begin building a Publisher. This module will be based on GenServer . It will take care of subscriptions, unsubscriptions and notifications. But we have to remember that Elixir as pure functional programming language does not maintain a state, how will be able to know who is subscribed?

Elixir provides a module called Agent which is prepared for this kind of issues. I will create a SubscriptionManager based on Agent in order to track all the subscriptions.



defmodule SubscriptionManager do

  def start_link do
   Agent.start_link(fn -> %{} end)
 end

 def start do
   Agent.start(fn -> %{} end)
 end

 def get_all(agent) do
   Agent.get(agent, fn x -> x end)
 end

 def get(agent, key) do
   Agent.get(agent, fn x -> Map.get(x, key) end)
 end

 def put(agent, key, value) do
   Agent.update(agent, fn x -> Map.put(x, key, value) end)
 end

 def delete(agent, key) do
   Agent.get_and_update(agent, fn x -> Map.pop(x, key) end)
 end

end
We can also improve this module using ETS but let's do it other time ;).

Once that we have a module on charge of the subscriptions, we can develop the Publisher module:

defmodule Publisher do
use GenServer

 # Server
 def start do
   {:ok, subscription_manager} = SubscriptionManager.start
   GenServer.start(Publisher, subscription_manager)
 end

 def handle_cast({:subscribe, name, pid}, subscription_manager) do
   SubscriptionManager.put(subscription_manager, name, pid)
    {:noreply, subscription_manager}
 end

 def handle_cast({:unsubscribe, name}, subscription_manager) do
   SubscriptionManager.delete(subscription_manager, name)
    {:noreply, subscription_manager}
 end

 def handle_cast({:notify, message}, subscription_manager) do
   SubscriptionManager.get_all(subscription_manager) |>
   Enum.map(fn {key, value} -> GenEvent.notify(value, message) end)
    {:noreply, subscription_manager}
 end

# Client
 def subscribe(server, name, pid) do
   GenServer.cast(server, {:subscribe, name, pid})
 end

 def unsubscribe(server, name) do
   GenServer.cast(server, {:unsubscribe, name})
 end

 def notify(server, message) do
    GenServer.cast(server, {:notify, message})
    IO.puts "I have published: '#{message}'"
 end

end
Then, we only need to develop our Subscriber module. It will be based on GenEvent and it will handle the received events in the easier way that we know: Printing .

defmodule Subscriber do
use GenEvent

 def start do
   {:ok, pid} = GenEvent.start([])
   GenEvent.add_handler(pid, Subscriber, [])
    {:ok, pid}
 end

 def handle_event(message, state) do
   IO.puts "I have received: '#{message}'"
    {:ok, state}
 end
end

Testing

Erlang, and by extension Elixir, has the possibility of connecting Erlang Virtual Machines. I will use this capability within the same host but it will work with hosts that see each others.

I will run three BEAMs, one for the publisher and two for the subscribers.



Node 1: The Publisher

Open a console and type:

$ iex --sname node1 --cookie pubsub -S mix
Now that we have an IEX running type the following:

iex(node1@host)> {:ok, manager} = Publisher.start
iex(node1@host)> :global.register_name('manager', manager)
I am registering the Publisher process globally using the name "manager" therefore any process from any connected BEAM will be able to find it.

Node 2: The Subscriber

Open another console and type:

iex --sname node2 --cookie pubsub -S mix
Now that we have an IEX running type the following:

iex(node2@host)> Node.connect :'node1@host'
iex(node2@host)> manager = :global.whereis_name('manager')
iex(node2@host)> {:ok, subscriber} = Subscriber.start
iex(node2@host)> Publisher.subscribe(manager, 'subs_node2', subscriber)
Firstly we have to connect to the node1 and then using the :global module this process is able to subscribe to the Publisher that can be even in other host.

Node 3: The Subscriber

iex --sname node3 --cookie pubsub -S mix
And type in the IEX:

iex(node3@host)> Node.connect :'node1@host'
iex(node3@host)> manager = :global.whereis_name('manager')
iex(node3@host)> {:ok, subscriber} = Subscriber.start
iex(node3@host)> Publisher.subscribe(manager, 'subs_node3', subscriber)
Publish a message:
Using whatever IEX (node1, node2 or node3) type the following:

iex> Publisher.notify(manager, "Hello !!")
The two subscriber will receive the sent message, "Hello !!" in this case.


Conclusion

As you have seen, it is really easy to code a distributed system using Elixir.

You can find the code in my github .