diff --git a/README.md b/README.md index e1f53ee..8765dce 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,27 @@ repository and submit a pull request back to develop. * reject * tx * version + * Message Serialisation + * addr + * block + * getaddr + * getblocks + * getdata + * getheaders + * headers + * inv + * notfound + * ping + * pong + * tx + * version + * Common Structure Serialisation + * varint/varint[] + * varstring/varstring[] + * inventory vector + * network address + * txin/txout/outpoint + * block header * OTP Application / Full Node * Peer * Connection Pool/Acceptor and Handler @@ -70,29 +91,9 @@ repository and submit a pull request back to develop. * Transaction Queues * Event Model * Logging Strategy - * Common Structure Serialisation - * varint/varint[] - * varstring/varstring[] - * inventory vector - * network address - * txin/txout/outpoint - * block header * Message Serialisation - * addr - * alert - * block - * getaddr - * getblocks - * getdata - * getheaders - * headers - * inv - * notfound - * ping - * pong * reject - * tx - * version + * alert * OTP Application / Full Node * Server Layout and Deployment * Blockchain Bulk Storage and Index API diff --git a/config/config.exs b/config/config.exs index 6dfa82f..60cc694 100644 --- a/config/config.exs +++ b/config/config.exs @@ -21,4 +21,6 @@ use Mix.Config # Configuration from the imported file will override the ones defined # here (which is why it is important to import them last). # -# import_config "#{Mix.env}.exs" + +import_config "#{Mix.env}.exs" + diff --git a/config/dev.exs b/config/dev.exs new file mode 100644 index 0000000..49f73a0 --- /dev/null +++ b/config/dev.exs @@ -0,0 +1,6 @@ +use Mix.Config + +config :bitcoin, :node, [] + +config :exlager, + level: :info diff --git a/config/test.exs b/config/test.exs new file mode 100644 index 0000000..e626a7e --- /dev/null +++ b/config/test.exs @@ -0,0 +1,9 @@ +use Mix.Config + + +# TODO Logger.supresses unused variable warnings when it removes parts of the AST +# Unfortunately Lager doesn't do that. Would be nice to find some workaround. + +config :exlager, + level: :error + diff --git a/lib/bitcoin.ex b/lib/bitcoin.ex index 6046471..c03df78 100644 --- a/lib/bitcoin.ex +++ b/lib/bitcoin.ex @@ -1,2 +1,25 @@ defmodule Bitcoin do + use Application + + # See http://elixir-lang.org/docs/stable/elixir/Application.html + # for more information on OTP Applications + def start(_type, _args) do + import Supervisor.Spec + + # Start node only if :bitcoin,:node config section is present + # TODO this is not great, because when using Bitcon-Ex as a lib, + # there must a way to overwrite our dev default (which is node enabled) + children = case Application.fetch_env(:bitcoin, :node) do + :error -> + [] + {:ok, _node_config} -> + [ supervisor(Bitcoin.Node.Supervisor, []) ] + end + + # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html + # for other strategies and supported options + opts = [strategy: :one_for_one, name: Bitcoin.Supervisor] + Supervisor.start_link(children, opts) + end end + diff --git a/lib/bitcoin/models/peer.ex b/lib/bitcoin/models/peer.ex deleted file mode 100644 index cb973f3..0000000 --- a/lib/bitcoin/models/peer.ex +++ /dev/null @@ -1,9 +0,0 @@ -defmodule Bitcoin.Models.Peer do - - defstruct ip_address: {0, 0, 0, 0} # IPV4 or IPV6 - - @type t :: %Bitcoin.Models.Peer{ - ip_address: tuple - } - -end \ No newline at end of file diff --git a/lib/bitcoin/node.ex b/lib/bitcoin/node.ex index a5f35f9..19f6cc0 100644 --- a/lib/bitcoin/node.ex +++ b/lib/bitcoin/node.ex @@ -1,26 +1,73 @@ defmodule Bitcoin.Node do - use Application + use GenServer - defmodule Subsystems do - use Supervisor + require Lager - def start_link do - Supervisor.start_link(__MODULE__, :ok, []) - end + @default_config %{ + listen_ip: '0.0.0.0', + listen_port: 8333, + max_connections: 8, + user_agent: "/Bitcoin-Ex:0.0.0/", + data_directory: Path.expand("~/.bitcoin-ex"), + services: <<1, 0, 0, 0, 0, 0, 0, 0>> # TODO probably doesn't belong to config + } + + @protocol_version 70002 + + + # Interface - @peer_subsystem_name Bitcoin.Node.Peers + def start_link, do: GenServer.start(__MODULE__, nil, name: __MODULE__) + def version_fields, do: GenServer.call(__MODULE__, :version_fields) + def config, do: GenServer.call(__MODULE__, :config) + def nonce, do: GenServer.call(__MODULE__, :nonce) + def height, do: 1 + + # Implementation + + def init(_) do + self() |> send(:initialize) + {:ok, %{}} + end - def init(:ok) do - children = [ - supervisor(@peer_subsystem_name, [[name: @peer_subsystem_name]]) - ] + def handle_info(:initialize, state) do + Lager.info "Node initialization" - supervise([], strategy: :one_for_one) + config = case Application.fetch_env(:bitcoin, :node) do + :error -> @default_config + {:ok, config} -> + @default_config |> Map.merge(config |> Enum.into(%{})) end + File.mkdir_p(config.data_directory) + + state = state|> Map.merge(%{ + nonce: Bitcoin.Util.nonce64(), + config: config + }) + + {:noreply, state} + end + + def handle_call(:config, _from, state), do: {:reply, state.config, state} + def handle_call(:nonce, _from, state), do: {:reply, state.nonce, state} + + def handle_call(:version_fields, _from, state) do + fields = %{ + height: height(), + nonce: state.nonce, + relay: true, + services: <<1, 0, 0, 0, 0, 0, 0, 0>>, + timestamp: timestamp(), + version: @protocol_version, + user_agent: state.config[:user_agent], + } + {:reply, fields, state} end - def start(_type, _args) do - Subsystems.start_link() + + def timestamp do + {megas, s, _milis} = :os.timestamp + round(1.0e6*megas + s) end -end \ No newline at end of file +end diff --git a/lib/bitcoin/node/network.ex b/lib/bitcoin/node/network.ex new file mode 100644 index 0000000..5632d59 --- /dev/null +++ b/lib/bitcoin/node/network.ex @@ -0,0 +1,36 @@ +defmodule Bitcoin.Node.Network do + + # TODO + # def connected? + # def connections + # def connect (this should be called on node start if node is started with some addnode option) + # some kind of health indicator? + # + # This module is also probably where Node will be requesting to fetch misisng inv / headers etc. + + alias Bitcoin.Node.Network + + @default_modules [ + # Addrs managager, keeps list of IPs to connect to + addr: Network.Addr, + # Peer connection handler, exchanges information with a single peer + peer: Network.Peer, + # Peers discovery - find IPs of peers to connect to if we have non in the database + discovery: Network.Discovery, + # Connection manager, accepts incoming connection, keeps track of all connected peers + connection_manager: Network.ConnectionManager + ] + + + def find_more_addrs do + modules[:discovery].begin_discovery() + end + + def modules do + case Application.get_env(:bitcoin, :node, :modules) do + nil -> @default_modules + list -> @default_modules |> Keyword.merge(list) + end + end + +end diff --git a/lib/bitcoin/node/network/addr.ex b/lib/bitcoin/node/network/addr.ex new file mode 100644 index 0000000..5ac2d17 --- /dev/null +++ b/lib/bitcoin/node/network/addr.ex @@ -0,0 +1,50 @@ +defmodule Bitcoin.Node.Network.Addr do + @moduledoc """ + Keeps database of known network nodes. + + Dummy version. Would be nice to switch to some dedicated struct from Protocol.NetworkAddress. + We may want to keep fields like last connection try times, last successful connection time, + maybe some score (e.g. higher for addrs from trusted seeds). Score could also help with blacklisting + nodes from which we detected abuse. + """ + use GenServer + + require Lager + + alias Bitcoin.Protocol.Types.NetworkAddress + + # Ignaring opts which contains modules list since we don't currently need it + def start_link(_opts \\ %{}), do: GenServer.start_link(__MODULE__, %{}, name: __MODULE__) + def add(%NetworkAddress{} = addr), do: GenServer.cast(__MODULE__, {:add, addr}) + def get, do: GenServer.call(__MODULE__, :get) + def count, do: GenServer.call(__MODULE__, :count) + def clear, do: GenServer.cast(__MODULE__, :clear) + + def handle_cast({:add, %NetworkAddress{} = addr}, addrs) do + Lager.debug("adding new network address #{addr.address |> :inet.ntoa}") + existing = addrs[addr.address] + + # If we already have this address, update timestamp if it's older + if (!existing || existing && existing.time < addr.time) && valid?(addr) do + {:noreply, addrs |> Map.put(addr.address, addr)} + else + {:noreply, addrs} + end + end + + def handle_cast(:clear, _addrs) do + {:noreply, %{}} + end + + def handle_call(:count, _from, addrs) do + {:reply, addrs |> Map.size, addrs} + end + + def handle_call(:get, _from, addrs) when addrs == %{}, do: {:reply, nil, addrs} + def handle_call(:get, _from, addrs) do + {:reply, addrs |> Map.values |> Enum.random, addrs} + end + + defp valid?(%NetworkAddress{} = na), do: na.time <= Bitcoin.Node.timestamp() + +end diff --git a/lib/bitcoin/node/network/connection_manager.ex b/lib/bitcoin/node/network/connection_manager.ex new file mode 100644 index 0000000..ce980d5 --- /dev/null +++ b/lib/bitcoin/node/network/connection_manager.ex @@ -0,0 +1,94 @@ +defmodule Bitcoin.Node.Network.ConnectionManager do + + # Reagent connection handler + defmodule ReagentHandler do + use Reagent + + def handle(%Reagent.Connection{socket: socket}) do + {:ok, pid} = Bitcoin.Node.Network.Peer.start(socket) + # Potential issue: + # If the connection gets closed after Peer.start but before switching the controlling process + # then probably Peer will never receive _:tcp_closed. Not sure if we need to care because + # it should just timout then + socket |> :gen_tcp.controlling_process(pid) + socket |> :inet.setopts(active: true) + :ok + end + end + + use GenServer + + require Lager + + alias Bitcoin.Protocol.Types.NetworkAddress + + def start_link(%{modules: _modules} = opts), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__) + def connect(ip, port), do: GenServer.cast(__MODULE__, {:connect, ip, port}) + def register_peer(), do: GenServer.call(__MODULE__, :register_peer) + + def init(opts) do + state = %{ + modules: opts.modules, + config: Bitcoin.Node.config(), + peers: [] + } + + {:ok, _pid} = Reagent.start_link(ReagentHandler, port: state.config[:listen_port]) + self() |> send(:periodical_connectivity_check) + {:ok, state} + end + + def handle_info(:periodical_connectivity_check, state) do + self() |> send(:check_connectivity) + self() |> Process.send_after(:periodical_connectivity_check, 10_000) + {:noreply, state} + end + + def handle_info(:check_connectivity, state) do + num_conn = length(state.peers) + max_conn = state.config[:max_connections] + Lager.info("[CM] #{num_conn} peers connected") + + # TODO we want to differentiate between outbound_max_connections and max_connections + # E.g. bitcoin-core behavior is that it won't have more than 8 outbound connections + # regardless of the max-connections setting. + # ALso, there's no hard limit on max_connections currently, Reagent limit should be + # dynamic plus we can go over limit if some peer connection is already in progress + # and we add another one + if num_conn < max_conn do + (0..(max_conn - num_conn)) |> Enum.each(fn _ -> + state |> add_peer() + end) + end + + {:noreply, state} + end + + def handle_info({:DOWN, _ref, :process, peer, _reason}, state) do + Lager.info("[CM] unregistered peer #{peer |> inspect}") + self() |> send(:check_connectivity) + {:noreply, state |> Map.put(:peers, state.peers |> List.delete(peer))} + end + + def handle_call(:register_peer, {peer, _ref}, state) do + Lager.info("[CM] registered peer #{peer |> inspect}") + state = state |> Map.put(:peers, [peer | state.peers]) + Process.monitor(peer) + {:reply, :ok, state} + end + + def handle_cast({:connect, ip, port}, state) do + Bitcoin.Node.Network.Peer.start(ip, port) + {:noreply, state} + end + + def add_peer(%{modules: modules}) do + case modules[:addr].get do + %NetworkAddress{address: ip, port: port} -> + connect(ip, port) + nil -> + Bitcoin.Node.Network.find_more_addrs() + end + end + +end diff --git a/lib/bitcoin/node/network/discovery.ex b/lib/bitcoin/node/network/discovery.ex new file mode 100644 index 0000000..ef1d61a --- /dev/null +++ b/lib/bitcoin/node/network/discovery.ex @@ -0,0 +1,59 @@ +defmodule Bitcoin.Node.Network.Discovery do + require Lager + use GenServer + + alias Bitcoin.Protocol.Types.NetworkAddress + + defmodule Strategy.DNS do + @moduledoc """ + Implements DNS node discovery. + + from Satoshi C Client (chainparams.cpp): + * alexykot.me + * bitcoin.petertodd.org + * bluematt.me + * bitcoin.schildbach.de + + https://en.bitcoin.it/wiki/Satoshi_Client_Node_Discovery#DNS_Addresses + """ + + require Lager + + @domains [ + [ "bitcoin.sipa.be", 'seed.bitcoin.sipa.be' ], # Pieter Wuille + [ "bluematt.me", 'dnsseed.bluematt.me' ], # Matt Corallo + [ "dashjr.org", 'dnsseed.bitcoin.dashjr.org' ], # Luke Dashjr + [ "bitcoinstats.com", 'seed.bitcoinstats.com' ], # Christian Decker + [ "xf2.org", 'bitseed.xf2.org' ], # Jeff Garzik + [ "bitcoin.jonasschnelli.ch", 'seed.bitcoin.jonasschnelli.ch' ] # Jonas Schnelli + ] + + def gather_peers(%{modules: modules} = _opts) do + + Enum.map(@domains, fn([seed_name, domain]) -> + Lager.info("Starting Peer Discovery via DNS for seed #{seed_name} at domain #{domain}") + Enum.each(:inet_res.lookup(domain, :in, :a), fn(ip) -> + %NetworkAddress{ + address: ip, + time: Bitcoin.Node.timestamp() + } |> modules[:addr].add + end) + end) + + end + + end + + def start_link(%{modules: _modules} = opts), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__) + + # Public Interface + def begin_discovery, do: GenServer.cast(__MODULE__, :begin_discovery) + + def handle_cast(:begin_discovery, %{discovery_started: true} = opts), do: {:noreply, opts} + def handle_cast(:begin_discovery, opts) do + Lager.info "Beginning Peer Discovery Process" + Strategy.DNS.gather_peers(opts) + {:noreply, opts |> Map.put(:discovery_started, true)} + end + +end diff --git a/lib/bitcoin/node/network/peer.ex b/lib/bitcoin/node/network/peer.ex new file mode 100644 index 0000000..f39f74b --- /dev/null +++ b/lib/bitcoin/node/network/peer.ex @@ -0,0 +1,271 @@ +defmodule Bitcoin.Node.Network.Peer do + @moduledoc """ + Connection handler. Manages peer state, sends mesasges, responds to pings etc. + + TODO maybe we should consider splitting connection and peer to separate genservers. + This will double number of processes, but it shouldn't be a problem below a few hundred + thousad peers. The problem is that API for accessing it from the outside may be a bit + more complicated (connection should be a master to peer, but we probably rather want to + call higher level functions on peer) + + TODO close connection if there's no successful handshake within specific time + """ + + use GenServer + require Lager + + alias Bitcoin.Node + alias Bitcoin.Protocol.Messages + alias Bitcoin.Protocol.Types.NetworkAddress + + @ping_timeout 30_000 # 30 seconds + @ping_frequency 600_000 # 10 minutes + + # Initialize Peer asking it to make connection to specific + def start(socket), do: GenServer.start(__MODULE__, socket) + def start(ip, port \\ 8333), do: GenServer.start(__MODULE__, %{ip: ip, port: port}) + + + # + # Init + # + + # Initialziation with ip and port, i.e. try to connect to specified addr + def init(%{ip: ip, port: port}) do + state = %{ + ip: ip, + port: port, + status: :connecting, + direction: :out, + buffer: "" + } + + # Connect may take a long time, we want init to finish as fast as possible + self() |> send(:connect) + + {:ok, state} + end + + # Initialize with existing socket (inbound connection) + def init(socket) do + {:ok, {ip, port}} = :inet.peername(socket) + + state = %{ + ip: ip, + port: port, + direction: :in, + status: :connecting, + socket: socket, + buffer: "" + } + + self() |> send(:handshake) + + {:ok, state} + end + + # + # Internal messages + # + + # Initialize TCP connection + def handle_info(:connect, %{ip: ip, port: port} = state) do + Lager.info "Connecting to #{ip |> :inet.ntoa}:#{port}" + case :gen_tcp.connect(ip, port, [:binary, active: true]) do + # Successful connection + {:ok, socket} -> + self() |> send(:handshake) + {:noreply, state |> Map.put(:socket, socket)} + # Timout while trying to connect + {:error, :etimedout} -> + state |> disconnect(:connection_timeout) + # Connection error + {:error, _} -> + state |> disconnect(:connection_error) + end + end + + # Initialize handshake by sending the VERSION packet + def handle_info(:handshake, state) do + node_config = Node.config() + + pkt = %Messages.Version{ + address_of_receiving_node: %NetworkAddress{ + address: state.ip |> ip_to_inet, + port: state.port, + }, + address_of_sending_node: %NetworkAddress{ + address: node_config.listen_ip |> ip_to_inet, + port: node_config.listen_port, + services: node_config.services, + }, + } + |> Map.merge(Bitcoin.Node.version_fields()) + |> Bitcoin.Protocol.Message.serialize + + :ok = state.socket |> :gen_tcp.send(pkt) + {:noreply, state} + end + + # Periodically send PING and ensure PONG is received + def handle_info(:periodic_ping, state) do + nonce = Bitcoin.Util.nonce64() + %Messages.Ping{nonce: nonce} |> send_message(state) + + state = state + |> Map.put(:last_ping_nonce, nonce) + |> Map.put(:last_ping_time, Bitcoin.Util.militime()) + + self() |> Process.send_after(:check_ping_response, @ping_timeout) + self() |> Process.send_after(:periodic_ping, @ping_frequency) + + {:noreply, state} + end + + # Check if we got an answer to our ping message + # We set last_ping_nonce to 0 when PONG with the proper nonce is received, + # so if it is set to zero, it means everything is ok + def handle_info(:check_ping_response, %{last_ping_nonce: 0} = state), do: {:noreply, state} + # last_ping_nonce != 0 which means we did not receive the PONG + def handle_info(:check_ping_response, state), do: state |> disconnect(:ping_timout) + + # + # Message handlers + # + + def handle_info({:msg, %Messages.Verack{}}, state) do + # We consider connection to be established when we receive VERSION packet and validate it, + # so we don't really care about the VERACK + {:noreply, state} + end + + def handle_info({:msg, %Messages.Version{} = version}, state) do + state |> debug("=> VERSION #{version |> inspect}") + + case validate_version(version) do + :ok -> + %Messages.Verack{} |> send_message(state) + {:noreply, state |> Map.put(:version, version) |> handle_connected} + _ -> + state |> disconnect(:version_mismatch) + end + end + + def handle_info({:msg, %Messages.Ping{nonce: nonce}}, state) do + state |> debug("=> PING") + %Messages.Pong{nonce: nonce} |> send_message(state) + {:noreply, state} + end + + def handle_info({:msg, %Messages.Pong{nonce: nonce}}, state) do + state |> debug("=> PONG") + + state = cond do + nonce == state[:last_ping_nonce] -> + state + |> Map.put(:ping_latency, Bitcoin.Util.militime() - state[:last_ping_time]) + |> Map.put(:last_ping_nonce, 0) + true -> state + end + + {:noreply, state} + end + + def handle_info({:msg, %Messages.GetHeaders{} = _msg}, state) do + state |> debug("=> GET HEADERS") + {:noreply, state} + end + + def handle_info({:msg, %Messages.Inv{inventory_vectors: inventory_vectors} = _msg}, state) do + state |> debug("=> INV #{inventory_vectors |> inspect}") + #Lager.info "#{ip |> inspect} <= I WANT IT ALL " + #%Messages.GetData{ + #inventory_vectors: inventory_vectors |> Enum.filter(fn iv -> iv.reference_type == :msg_tx end) + #}|> send_message(state) + {:noreply, state} + end + + def handle_info({:msg, msg}, state) do + state |> debug("=>? #{msg |> inspect}") + {:noreply, state} + end + + # + # TCP stuff + # + + # FIXME abuse vector + # When no message is found ini the buffer we will continue to append it indefinitely (running out of memory eventually) + # E.g. fix Massage.parse_stream should separate case of incomplete message from message not found + def handle_info({:tcp, _port, data}, state) do + #state |> debug(">> #{data |> Base.encode16}") + state = state |> Map.put(:buffer, process_buffer(state[:buffer] <> data)) + {:noreply, state} + end + + def handle_info({:tcp_closed, _port}, state) do + state + |> debug("connection closed") + |> disconnect(:tcp_closed) + end + + + def process_buffer(buffer) when byte_size(buffer) < 24, do: buffer # 24 is the header size + def process_buffer(buffer) do + [msg, remaining] = buffer |> Bitcoin.Protocol.Message.parse_stream + case msg do + nil -> remaining + _ -> + self() |> send({:msg, msg.payload.message}) + remaining |> process_buffer + end + end + + def send_message(msg, state) do + data = msg |> Bitcoin.Protocol.Message.serialize + state.socket |> :gen_tcp.send(data) + :ok + end + + # Convert provided ip address to the ip tuple + def ip_to_inet({_,_,_,_} = inet), do: inet # IPv4 + def ip_to_inet({_,_,_,_,_,_,_,_} = inet), do: inet # IPv6 + def ip_to_inet(ip), do: ({:ok, _inet} = ip |> :inet.parse_address) |> elem(1) + + + # Called after a successful handshake. + defp handle_connected(state) do + :ok = Node.Network.modules()[:connection_manager].register_peer() + self() |> send(:periodic_ping) + state + |> Map.put(:status, :connected) + |> debug("successfully connected") + end + + defp debug(%{ip: ip, port: port, direction: direction} = state, msg) do + Lager.debug "[#{direction}] #{ip |> :inet.ntoa}:#{port} #{msg}" + state + end + + defp disconnect(state, reason \\ :none) do + Lager.info "#{state.ip |> :inet.ntoa} disconnected :#{reason}" + {:stop, :normal, state |> Map.put(:status, :disconnected)} + end + + # Validate version packet received from the peer + # Return :ok, or {:error, reason} tuple with the first encountered error + defp validate_version(%Messages.Version{} = version) do + [ + # Nonce in the VERSION packet is used to detect self connections + fn version -> if version.nonce != Bitcoin.Node.nonce(), do: :ok, else: {:error, :self_connection} end, + + # Check if the timestamp difference is below 1 hour + fn version -> if abs(Bitcoin.Node.timestamp() - version.timestamp) < 3600, do: :ok, else: {:error, :incorrect_timestamp} end + ] + |> Bitcoin.Util.run_validations(version) + end + +end + + + diff --git a/lib/bitcoin/node/network/supervisor.ex b/lib/bitcoin/node/network/supervisor.ex new file mode 100644 index 0000000..9002414 --- /dev/null +++ b/lib/bitcoin/node/network/supervisor.ex @@ -0,0 +1,21 @@ +defmodule Bitcoin.Node.Network.Supervisor do + use Supervisor + + require Lager + + def start_link do + Supervisor.start_link(__MODULE__, name: __MODULE__) + end + + def init(_) do + Lager.info "Starting Node subsystems" + modules = Bitcoin.Node.Network.modules() + children = + [:addr, :discovery, :connection_manager] + |> Enum.map(fn name -> modules[name] end) + |> Enum.map(fn m -> worker(m, [%{modules: modules}]) end) + + children |> supervise(strategy: :one_for_one) + end + +end diff --git a/lib/bitcoin/node/peers.ex b/lib/bitcoin/node/peers.ex deleted file mode 100644 index 49e6c75..0000000 --- a/lib/bitcoin/node/peers.ex +++ /dev/null @@ -1,20 +0,0 @@ -defmodule Bitcoin.Node.Peers do - use Supervisor - - @discovery_service Bitcoin.Node.Peers.Discovery - @peer_connection_pool_service Bitcoin.Node.Peers.ConnectionPool - - def start_link(_opts) do - Supervisor.start_link(__MODULE__, :ok, name: __MODULE__) - end - - def init(:ok) do - children = [ - # FIXME: For some reason, under newer versions of the dependencies the project specifies, the Reagent socket acceptor is broken which prevents our other tests from running. So, for now, this is disabled. - # supervisor(Reagent, [@peer_connection_pool_service, [name: @peer_connection_pool_service, port: 0]]), # the OS will pick a port on which we should listen - supervisor(@discovery_service, [[name: @discovery_service, peer_connection_pool: @peer_connection_pool_service]]) - ] - - supervise(children, strategy: :one_for_one) - end -end \ No newline at end of file diff --git a/lib/bitcoin/node/peers/connection_pool.ex b/lib/bitcoin/node/peers/connection_pool.ex deleted file mode 100644 index c40a8c8..0000000 --- a/lib/bitcoin/node/peers/connection_pool.ex +++ /dev/null @@ -1,41 +0,0 @@ -defmodule Bitcoin.Node.Peers.ConnectionPool do - use Reagent - - def start(connection) do - GenServer.start(__MODULE__, connection, name: __MODULE__) - end - - use GenServer - - def init(connection) do - {:ok, connection} - end - - require Lager - - def add_peer(peer) do - Lager.info "Peer connection pool received request to add peer: #{inspect(peer)}" - end - - # Server - - # this message is sent when the socket has been completely accepted and the - # process has been made owner of the socket, you don't need to wait for it - # when implementing handle because it's internally handled - def handle_info({ Reagent, :ack }, connection) do - Lager.info "New Peer Connection" - connection |> Socket.active!() - { :noreply, connection } - end - - def handle_info({ :tcp, _, data }, connection) do - connection |> Socket.Stream.send!(data) - { :noreply, connection } - end - - def handle_info({ :tcp_closed, _ }, connection) do - Lager.info "Closed Peer Connection" - { :stop, :normal, connection } - end - -end \ No newline at end of file diff --git a/lib/bitcoin/node/peers/discovery.ex b/lib/bitcoin/node/peers/discovery.ex deleted file mode 100644 index 116330a..0000000 --- a/lib/bitcoin/node/peers/discovery.ex +++ /dev/null @@ -1,64 +0,0 @@ -defmodule Bitcoin.Node.Peers.Discovery do - require Lager - use GenServer - - alias Bitcoin.Models.Peer - - def start_link(opts) do - GenServer.start_link(__MODULE__, {:ok, opts}, opts) - end - - def init({:ok, opts}) do - # begin_discovery(opts[:peer_connection_pool]) - { :ok, opts } - end - - @moduledoc """ - Implements DNS node discovery. - - from Satoshi C Client (chainparams.cpp): - * alexykot.me - * bitcoin.petertodd.org - * bluematt.me - * bitcoin.schildbach.de - - https://en.bitcoin.it/wiki/Satoshi_Client_Node_Discovery#DNS_Addresses - """ - defmodule Strategy.DNS do - - require Lager - - @domains [ - [ "bitcoin.sipa.be", 'seed.bitcoin.sipa.be' ], # Pieter Wuille - [ "bluematt.me", 'dnsseed.bluematt.me' ], # Matt Corallo - [ "dashjr.org", 'dnsseed.bitcoin.dashjr.org' ], # Luke Dashjr - [ "bitcoinstats.com", 'seed.bitcoinstats.com' ], # Christian Decker - [ "xf2.org", 'bitseed.xf2.org' ], # Jeff Garzik - [ "bitcoin.jonasschnelli.ch", 'seed.bitcoin.jonasschnelli.ch' ] # Jonas Schnelli - ] - - def gather_peers(peer_pool) do - - Enum.map(@domains, fn([seed_name, domain]) -> - Lager.info("Starting Peer Discovery via DNS for seed #{seed_name} at domain #{domain}") - Enum.each(:inet_res.lookup(domain, :in, :a), fn(peer) -> - peer_pool.add_peer(%Peer{ip_address: peer}) - end) - end) - - end - - end - - # Public Interface - def begin_discovery(peer_connection_pool) do - GenServer.cast(__MODULE__, {:begin_discovery, peer_connection_pool}) - end - - def handle_cast({:begin_discovery, peer_connection_pool}, state) do - Lager.info "Beginning Peer Discovery Process" - Strategy.DNS.gather_peers(peer_connection_pool) - {:noreply, state} - end - -end \ No newline at end of file diff --git a/lib/bitcoin/node/supervisor.ex b/lib/bitcoin/node/supervisor.ex new file mode 100644 index 0000000..055cd69 --- /dev/null +++ b/lib/bitcoin/node/supervisor.ex @@ -0,0 +1,20 @@ +defmodule Bitcoin.Node.Supervisor do + use Supervisor + + require Lager + + def start_link do + Supervisor.start_link(__MODULE__, :ok, name: __MODULE__) + end + + def init(:ok) do + Lager.info "Starting Node subsystems" + children = [ + worker(Bitcoin.Node, []), + supervisor(Bitcoin.Node.Network.Supervisor, []) + ] + + children |> supervise(strategy: :one_for_one) + end + +end diff --git a/lib/bitcoin/protocol/message.ex b/lib/bitcoin/protocol/message.ex index 5bf3282..f2880b0 100644 --- a/lib/bitcoin/protocol/message.ex +++ b/lib/bitcoin/protocol/message.ex @@ -4,6 +4,10 @@ defmodule Bitcoin.Protocol.Message do https://en.bitcoin.it/wiki/Protocol_documentation#Message_structure """ + alias Bitcoin.Protocol.Messages + alias Bitcoin.Protocol.Message.Payload + alias Bitcoin.Protocol.Message.Header + defimpl String.Chars, for: Bitcoin.Protocol.Message do @spec to_string(Message) :: String.t @@ -25,17 +29,15 @@ defmodule Bitcoin.Protocol.Message do end - defstruct header: Bitcoin.Protocol.Message.Header, - message: Bitcoin.Protocol.Message.Payload + defstruct header: Header, + payload: Payload @type t :: %{ - header: Bitcoin.Protocol.Message.Header.t, - message: Bitcoin.Protocol.Message.Payload.t + header: Header.t, + payload: Payload.t } - alias Bitcoin.Protocol.Messages - - @command_handlers %{ + @commands %{ "addr" => Messages.Addr, "alert" => Messages.Alert, "block" => Messages.Block, @@ -54,80 +56,9 @@ defmodule Bitcoin.Protocol.Message do "version" => Messages.Version } - defmodule Payload do - - defimpl String.Chars, for: Payload do - @spec to_string(Payload.t) :: String.t - def to_string(data) do - """ - parsed data: - #{data.payload |> String.Chars.to_string()} - raw data: - #{"0x" <> Base.encode16(data.raw_data)} - """ |> String.strip() - end - end - - defstruct raw_data: <<>>, - message: <<>> - - @type t :: %Payload{ - raw_data: binary, - message: binary - } - - def parse(command, data) do - message = case Bitcoin.Protocol.Message.handler(command) do - # Unrecognized message - nil -> <<>> - # Parse using message specific module - handler -> handler.parse(data) - end - - %Payload{ - raw_data: data, - message: message - } - end - - end + @message_types @commands |> Map.values() + @command_names @commands |> Map.keys() - defmodule Header do - - @known_network_identifiers %{ - main: <<0xF9, 0xBE, 0xB4, 0xD9>>, - testnet: <<0xFA, 0xBF, 0xB5, 0xDA>>, - testnet3: <<0x0B, 0x11, 0x09, 0x07>>, - namecoin: <<0xF9, 0xBE, 0xB4, 0xFE>> - } - - defstruct network_identifier: 0, - command: <<>>, - payload_size_bytes: 0, - checksum: 0 - - @type t :: %Header{ - network_identifier: non_neg_integer, - command: String.t, - payload_size_bytes: non_neg_integer, - checksum: non_neg_integer # sha256(sha256(payload)) first four bytes - } - - def parse(<>) do - - %Header{ - network_identifier: network_identifier, - command: command |> String.trim_trailing(<<0>>), - payload_size_bytes: payload_size_bytes, - checksum: checksum - } - end - - end @doc """ Reads and deserialises bitcoin message in serialised format and returns the parsed result @@ -141,16 +72,76 @@ defmodule Bitcoin.Protocol.Message do header = Header.parse(raw_header) - %{ + %__MODULE__{ header: header, payload: Payload.parse(header.command, payload) } end + def parse_stream(message) do + + << + raw_header :: bytes-size(24), + data :: binary + >> = message + + header = Header.parse(raw_header) + + if byte_size(data) < header.payload_size_bytes do + [nil, message] + else + size = header.payload_size_bytes + << + payload :: binary-size(size), + remaining :: binary + >> = data + + message = %__MODULE__{ + header: header, + payload: Payload.parse(header.command, payload) + } + + [message, remaining] + end + + end + + @doc """ + Returns message type associated with given command + """ + def message_type(command), do: @commands[command] + @doc """ - Returns module which can parse and build messages with specified command + Returns command associated with given message type """ - def handler(command), do: @command_handlers[command] + def command_name(message_type) when message_type in @message_types do + @commands + |> Enum.find(fn {_k,v} -> v == message_type end) + |> elem(0) + end + + @doc """ + List of supported commands + """ + def command_names, do: @command_names + + @doc """ + Serialize message type struct into a full binary message that is ready to be send over the network + """ + def serialize(%{__struct__: message_type} = struct) when message_type in @message_types do + + << network_identifier :: unsigned-little-integer-size(32) >> = <<0xF9, 0xBE, 0xB4, 0xD9>># TODO read from config (e.g. magic[Node.network()] + + payload = message_type.serialize(struct) + header = %Header{ + network_identifier: network_identifier, + command: message_type |> command_name, + payload_size_bytes: byte_size(payload), + checksum: Header.checksum(payload) + } + + Header.serialize(header) <> payload + end end diff --git a/lib/bitcoin/protocol/message/header.ex b/lib/bitcoin/protocol/message/header.ex new file mode 100644 index 0000000..e51e39b --- /dev/null +++ b/lib/bitcoin/protocol/message/header.ex @@ -0,0 +1,52 @@ +defmodule Bitcoin.Protocol.Message.Header do + + @known_network_identifiers %{ + main: <<0xF9, 0xBE, 0xB4, 0xD9>>, + testnet: <<0xFA, 0xBF, 0xB5, 0xDA>>, + testnet3: <<0x0B, 0x11, 0x09, 0x07>>, + namecoin: <<0xF9, 0xBE, 0xB4, 0xFE>> + } + + defstruct network_identifier: 0, + command: <<>>, + payload_size_bytes: 0, + checksum: 0 + + @type t :: %__MODULE__{ + network_identifier: non_neg_integer, + command: String.t, + payload_size_bytes: non_neg_integer, + checksum: non_neg_integer # sha256(sha256(payload)) first four bytes + } + + def parse(<>) do + + %__MODULE__{ + network_identifier: network_identifier, + command: command |> String.trim_trailing(<<0>>), + payload_size_bytes: payload_size_bytes, + checksum: checksum + } + end + + def serialize(%__MODULE__{} = s) do + command = s.command |> String.pad_trailing(12, <<0>>) + << + s.network_identifier :: unsigned-little-integer-size(32), + command :: bytes-size(12), + s.payload_size_bytes :: unsigned-little-integer-size(32), + s.checksum :: unsigned-little-integer-size(32) + >> + end + + def checksum(payload) do + << result :: unsigned-little-integer-size(32), _ :: binary >> = :crypto.hash(:sha256, :crypto.hash(:sha256, payload)) + result + end + +end + diff --git a/lib/bitcoin/protocol/message/payload.ex b/lib/bitcoin/protocol/message/payload.ex new file mode 100644 index 0000000..374a4c1 --- /dev/null +++ b/lib/bitcoin/protocol/message/payload.ex @@ -0,0 +1,38 @@ +defmodule Bitcoin.Protocol.Message.Payload do + + require Lager + + @command_names Bitcoin.Protocol.Message.command_names() + + defimpl String.Chars, for: Payload do + @spec to_string(Payload.t) :: String.t + def to_string(data) do + """ + parsed data: + #{data.payload |> String.Chars.to_string()} + raw data: + #{"0x" <> Base.encode16(data.raw_data)} + """ |> String.strip() + end + end + + defstruct raw_data: <<>>, + message: <<>> + + @type t :: %__MODULE__{ + raw_data: binary, + message: binary + } + + def parse(command, data) when command in @command_names do + %__MODULE__{ + raw_data: data, + message: data |> Bitcoin.Protocol.Message.message_type(command).parse + } + end + + def parse(command, data) do + Lager.info "Unknown command: #{command |> inspect} data[#{byte_size(data)}]" + end + +end diff --git a/lib/bitcoin/protocol/messages/inv.ex b/lib/bitcoin/protocol/messages/inv.ex index 13a6bf1..f50e107 100644 --- a/lib/bitcoin/protocol/messages/inv.ex +++ b/lib/bitcoin/protocol/messages/inv.ex @@ -9,12 +9,14 @@ defmodule Bitcoin.Protocol.Messages.Inv do alias Bitcoin.Protocol.Types.Integer alias Bitcoin.Protocol.Types.InventoryVector + defstruct inventory_vectors: [] @type t :: %__MODULE__{ inventory_vectors: [InventoryVector] } + def parse(data) do [count, payload] = Integer.parse_stream(data) diff --git a/lib/bitcoin/protocol/messages/verack.ex b/lib/bitcoin/protocol/messages/verack.ex index bb47cf7..38bac94 100644 --- a/lib/bitcoin/protocol/messages/verack.ex +++ b/lib/bitcoin/protocol/messages/verack.ex @@ -7,8 +7,10 @@ defmodule Bitcoin.Protocol.Messages.Verack do https://en.bitcoin.it/wiki/Protocol_specification#verack """ + defstruct [] + def parse(_data) do - %{} + %__MODULE__{} end def serialize(_), do: <<>> diff --git a/lib/bitcoin/protocol/types/inventory_vector.ex b/lib/bitcoin/protocol/types/inventory_vector.ex index da303c7..23ce2fe 100644 --- a/lib/bitcoin/protocol/types/inventory_vector.ex +++ b/lib/bitcoin/protocol/types/inventory_vector.ex @@ -15,6 +15,12 @@ defmodule Bitcoin.Protocol.Types.InventoryVector do hash: String.t } + defimpl Inspect, for: __MODULE__ do + def inspect(data, _opts) do + "%InventoryVector{ #{data.reference_type} :: #{data.hash |> Base.encode16} }" + end + end + def parse(<>) do %Bitcoin.Protocol.Types.InventoryVector{ reference_type: type_id |> get_type_name, diff --git a/lib/bitcoin/protocol/types/network_address.ex b/lib/bitcoin/protocol/types/network_address.ex index c314f50..8d9292b 100644 --- a/lib/bitcoin/protocol/types/network_address.ex +++ b/lib/bitcoin/protocol/types/network_address.ex @@ -80,7 +80,7 @@ defmodule Bitcoin.Protocol.Types.NetworkAddress do # Convert address bytes to erlang :inet ip adress, IPv4 def addr_to_inet(<<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xFF, 0xFF, b1, b2, b3, b4>>), do: {b1, b2, b3, b4} def addr_to_inet(<<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, b1, b2, b3, b4>>), do: {b1, b2, b3, b4} - def addr_to_inet(<< ipv6 :: binary-size(128) >>), do: {0,0,0,0} #TODO IPv6 + def addr_to_inet(<< ipv6 :: binary-size(16) >>), do: {0,0,0,0} #TODO IPv6 # Convert erlang inet ip adress to address byptes IPv4 (TODO IPv6) def inet_to_addr({b1, b2, b3, b4}), do: <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xFF, 0xFF, b1, b2, b3, b4>> diff --git a/lib/bitcoin/protocol/types/outpoint.ex b/lib/bitcoin/protocol/types/outpoint.ex index 3066b50..0c2f9fe 100644 --- a/lib/bitcoin/protocol/types/outpoint.ex +++ b/lib/bitcoin/protocol/types/outpoint.ex @@ -8,6 +8,12 @@ defmodule Bitcoin.Protocol.Types.Outpoint do index: non_neg_integer } + defimpl Inspect, for: __MODULE__ do + def inspect(data, _opts) do + "%Outpoint{ ##{data.index} hash: #{data.hash |> Base.encode16} }" + end + end + def parse_stream(<>) do [%__MODULE__{ hash: hash, diff --git a/lib/bitcoin/protocol/types/transaction_input.ex b/lib/bitcoin/protocol/types/transaction_input.ex index 0f32a48..c4f9def 100644 --- a/lib/bitcoin/protocol/types/transaction_input.ex +++ b/lib/bitcoin/protocol/types/transaction_input.ex @@ -13,6 +13,12 @@ defmodule Bitcoin.Protocol.Types.TransactionInput do sequence: non_neg_integer } + defimpl Inspect, for: __MODULE__ do + def inspect(data, _opts) do + "%In{ ##{data.sequence} output: #{data.previous_output |> Kernel.inspect}, sig: #{data.signature_script |> Base.encode16} }" + end + end + def parse_stream(data) do [outpoint, payload] = Outpoint.parse_stream(data) diff --git a/lib/bitcoin/protocol/types/transaction_output.ex b/lib/bitcoin/protocol/types/transaction_output.ex index 153246a..89894b5 100644 --- a/lib/bitcoin/protocol/types/transaction_output.ex +++ b/lib/bitcoin/protocol/types/transaction_output.ex @@ -10,6 +10,12 @@ defmodule Bitcoin.Protocol.Types.TransactionOutput do pk_script: bitstring } + defimpl Inspect, for: __MODULE__ do + def inspect(data, _opts) do + "%Out{ #{data.value} -> #{data.pk_script |> Base.encode16} }" + end + end + def parse_stream(data) do <> = data diff --git a/lib/bitcoin/util.ex b/lib/bitcoin/util.ex new file mode 100644 index 0000000..125ad9b --- /dev/null +++ b/lib/bitcoin/util.ex @@ -0,0 +1,24 @@ +defmodule Bitcoin.Util do + + # Random 64 bit nonce + def nonce64, do: (:rand.uniform(0xFF_FF_FF_FF_FF_FF_FF_FF) |> round) - 1 + + # Timestamp represented as a float + def militime do + {megas, s, milis} = :os.timestamp + 1.0e6*megas + s + milis * 1.0e-6 + end + + # Helper to run series of functions as a validation. + # It returns :ok if all functions return :ok + # Otherwise, first encountered error is returned. + def run_validations(funs, struct) do + funs |> Enum.reduce(:ok, fn(fun, status) -> + case status do + :ok -> fun.(struct) + error -> error + end + end) + end + +end diff --git a/mix.exs b/mix.exs index 8fed0f2..3e63c1d 100644 --- a/mix.exs +++ b/mix.exs @@ -14,7 +14,7 @@ defmodule Bitcoin.Mixfile do # # Type `mix help compile.app` for more information def application do - [mod: { Bitcoin.Node, [] }, + [mod: { Bitcoin, [] }, applications: [ :exlager ] diff --git a/test/node/network/addr_test.exs b/test/node/network/addr_test.exs new file mode 100644 index 0000000..833cc8a --- /dev/null +++ b/test/node/network/addr_test.exs @@ -0,0 +1,23 @@ +defmodule Bitcoin.Node.Network.AddrTest do + use ExUnit.Case + + alias Bitcoin.Node.Network.Addr + alias Bitcoin.Protocol.Types.NetworkAddress, as: NA + + test "addrs management" do + {:ok, _pid} = Addr.start_link + Addr.clear() + assert Addr.count() == 0 + assert Addr.get() == nil + na1 = %NA{address: {1,2,3,4}} + na2 = %NA{address: {1,2,3,5}} + na1 |> Addr.add + assert Addr.count() == 1 + assert Addr.get() == na1 + na2 |> Addr.add + assert Addr.count() == 2 + # couldn't figure out how to force :rand.seed (it seems to be local per process?) + results = (1..100) |> Enum.reduce(MapSet.new, fn(_x, r) -> r |> MapSet.put(Addr.get()) end) + assert results == MapSet.new([na1, na2]) + end +end diff --git a/test/util_test.exs b/test/util_test.exs new file mode 100644 index 0000000..77fe33c --- /dev/null +++ b/test/util_test.exs @@ -0,0 +1,25 @@ +defmodule Bitcoin.UTilTest do + use ExUnit.Case + + alias Bitcoin.Util + + test "nonce64" do + nonce1 = Util.nonce64() + assert nonce1 >= 0 + assert nonce1 < 0xFF_FF_FF_FF_FF_FF_FF_FF + nonce2 = Util.nonce64() + assert nonce1 != nonce2 # randomly failing test ;) + end + + test "run validations" do + funs = [ + fn x -> if rem(x,2) == 0, do: :ok, else: {:error, :parity} end, + fn x -> if x >= 5, do: :ok, else: {:error, :below5} end, + fn x -> if x >= 10, do: :ok, else: {:error, :below10} end, + ] + assert Util.run_validations(funs, 10) == :ok + assert Util.run_validations(funs, 3) == {:error, :parity} + assert Util.run_validations(funs, 2) == {:error, :below5} + assert Util.run_validations(funs, 8) == {:error, :below10} + end +end