From 457fcdb19aa775683a8d3a18cf8b9e4cabf0f67e Mon Sep 17 00:00:00 2001 From: Dominique VASSARD Date: Thu, 24 Jan 2019 16:53:27 +0100 Subject: [PATCH 01/12] Implement INIT message --- lib/boltex/bolt.ex | 96 ++++++++++++++++--- lib/boltex/pack_stream/message.ex | 2 +- lib/boltex/pack_stream/message/encoder.ex | 37 +++++++ test/boltex/bolt_test.exs | 15 +-- .../pack_stream/message/encoder_test.exs | 10 +- test/support/integration_case.ex | 28 ++---- 6 files changed, 145 insertions(+), 43 deletions(-) diff --git a/lib/boltex/bolt.ex b/lib/boltex/bolt.ex index e329d64..983e046 100644 --- a/lib/boltex/bolt.ex +++ b/lib/boltex/bolt.ex @@ -9,13 +9,13 @@ defmodule Boltex.Bolt do @zero_chunk <<0x00, 0x00>> - @max_version 2 + @max_version 3 @summary ~w(success ignored failure)a @moduledoc """ - A library that handles Bolt Protocol (v1 and v2). - Note that for now, only Neo4j implements Bolt v2. + A library that handles Bolt Protocol (v1, v2 and v3). + Note that for now, only Neo4j implements Bolt v2 and v3. It handles all the protocol specific steps (i.e. handshake, init) as well as sending and receiving messages and wrapping @@ -95,8 +95,15 @@ defmodule Boltex.Bolt do ## Options See "Shared options" in the documentation of this module. + + Returns an ok-tuple containing the bolt protocol version to be used. + + ## Example: + + iex> Bolt.handshake(:gen_tcp, port) + {:ok, 3} """ - @spec handshake(atom(), port(), Keyword.t()) :: :ok | {:error, Boltex.Error.t()} + @spec handshake(atom(), port(), Keyword.t()) :: {:ok, integer()} | {:error, Boltex.Error.t()} def handshake(transport, port, options \\ []) do recv_timeout = get_recv_timeout(options) @@ -120,7 +127,7 @@ defmodule Boltex.Bolt do {:ok, <> = packet} when version <= @max_version -> Boltex.Logger.log_message(:server, :handshake, packet, :hex) Boltex.Logger.log_message(:server, :handshake, version) - :ok + {:ok, version} {:ok, other} -> {:error, Error.exception(other, port, :handshake)} @@ -131,7 +138,7 @@ defmodule Boltex.Bolt do end @doc """ - Initialises the connection. + Initialises the connection (Bolt v1 and v2). Expects a transport module (i.e. `gen_tcp`) and a `Port`. Accepts authorisation params in the form of {username, password}. @@ -150,17 +157,51 @@ defmodule Boltex.Bolt do """ @spec init(atom(), port(), tuple(), Keyword.t()) :: {:ok, any()} | {:error, Boltex.Error.t()} def init(transport, port, auth \\ {}, options \\ []) do - send_message(transport, port, {:init, [auth]}) + initialize(:init, transport, port, auth, options) + end + + @doc """ + Initialises the connection (Bolt v3). + + Expects a transport module (i.e. `gen_tcp`) and a `Port`. Accepts + authorisation params in the form of {username, password}. + + ## Options + + See "Shared options" in the documentation of this module. + + ## Examples + + iex> Boltex.Bolt.hello :gen_tcp, port + {:ok, info} + + iex> Boltex.Bolt.hello :gen_tcp, port, {"username", "password"} + {:ok, info} + """ + @spec hello(atom(), port(), tuple(), Keyword.t()) :: {:ok, any()} | {:error, Boltex.Error.t()} + def hello(transport, port, auth \\ {}, options \\ []) do + initialize(:hello, transport, port, auth, options) + end + + @spec initialize( + Boltex.PackStream.Message.out_signature(), + atom(), + port(), + tuple(), + Keyword.t() + ) :: {:ok, any()} | {:error, Boltex.Error.t()} + defp initialize(signature, transport, port, auth, options) do + send_message(transport, port, {signature, [auth]}) case receive_data(transport, port, options) do {:success, info} -> {:ok, info} {:failure, response} -> - {:error, Error.exception(response, port, :init)} + {:error, Error.exception(response, port, signature)} other -> - {:error, Error.exception(other, port, :init)} + {:error, Error.exception(other, port, signature)} end end @@ -222,6 +263,35 @@ defmodule Boltex.Bolt do end end + def run_statement_with_metadata( + transport, + port, + statement, + params \\ %{}, + metadata \\ %{}, + options \\ [] + ) do + data = [statement, params, metadata] + + with :ok <- send_message(transport, port, {:run, data}), + {:success, _} = data <- receive_data(transport, port, options), + :ok <- send_message(transport, port, {:pull_all, []}), + more_data <- receive_data(transport, port, options), + more_data = List.wrap(more_data), + {:success, _} <- List.last(more_data) do + [data | more_data] + else + {:failure, map} -> + Boltex.Error.exception(map, port, :run_statement) + + error = %Boltex.Error{} -> + error + + error -> + Boltex.Error.exception(error, port, :run_statement) + end + end + @doc """ Implementation of Bolt's ACK_FAILURE. It acknowledges a failure while keeping transactions alive. @@ -291,7 +361,8 @@ defmodule Boltex.Bolt do @spec receive_data(atom(), port(), Keyword.t(), list()) :: {atom(), Boltex.PackStream.value()} | {:error, any()} def receive_data(transport, port, options \\ [], previous \\ []) do - with {:ok, data} <- do_receive_data(transport, port, options) do + with {:ok, data} <- do_receive_data(transport, port, options), + _ <- IO.puts(inspect(data)) do case Message.decode(data) do {:record, _} = data -> receive_data(transport, port, options, [data | previous]) @@ -318,7 +389,10 @@ defmodule Boltex.Bolt do defp do_receive_data(transport, port, options) do recv_timeout = get_recv_timeout(options) - case transport.recv(port, 2, recv_timeout) do + rec = transport.recv(port, 2, recv_timeout) + IO.puts(inspect(rec)) + + case rec do {:ok, <>} -> do_receive_data_(transport, port, chunk_size, options, <<>>) diff --git a/lib/boltex/pack_stream/message.ex b/lib/boltex/pack_stream/message.ex index 27f951c..d2ae662 100644 --- a/lib/boltex/pack_stream/message.ex +++ b/lib/boltex/pack_stream/message.ex @@ -10,7 +10,7 @@ defmodule Boltex.PackStream.Message do alias Boltex.PackStream.Message.Decoder @type in_signature :: :failure | :ignored | :record | :success - @type out_signature :: :init | :run | :ack_failure | :discard_all | :pull_all | :reset + @type out_signature :: :ack_failure | :discard_all | :hello | :init | :pull_all | :reset | :run @type raw :: {out_signature, list()} @type decoded :: {in_signature(), any()} @type encoded :: <<_::16, _::_*8>> diff --git a/lib/boltex/pack_stream/message/encoder.ex b/lib/boltex/pack_stream/message/encoder.ex index 5672fbb..46d0866 100644 --- a/lib/boltex/pack_stream/message/encoder.ex +++ b/lib/boltex/pack_stream/message/encoder.ex @@ -16,6 +16,7 @@ defmodule Boltex.PackStream.Message.Encoder do @ack_failure_signature 0x0E @discard_all_signature 0x2F + @hello_signature 0x01 @init_signature 0x01 @pull_all_signature 0x3F @reset_signature 0x0F @@ -50,6 +51,41 @@ defmodule Boltex.PackStream.Message.Encoder do do_encode(:init, [@client_name, auth_params(auth)]) end + @doc """ + Encode HELLO message without auth token; + + HELLO message is similar to INIT but is used by Bolt protocol v3 and higher + + ## Example: + iex(1)> Message.encode({:hello, [{"neo4j", "password"}]}) + <<0, 77, 177, 1, 164, 139, 99, 114, 101, 100, 101, 110, 116, 105, 97, 108, 115, + 136, 112, 97, 115, 115, 119, 111, 114, 100, 137, 112, 114, 105, 110, 99, 105, + 112, 97, 108, 133, 110, 101, 111, 52, 106, 134, 115, 99, 104, 101, 109, 101, + 133, ...>> + """ + def encode({:hello, []}) do + encode({:hello, [{}]}) + end + + @doc """ + Encode HELLO message with auth token + + HELLO message is similar to INIT but is used by Bolt protocol v3 and higher + + ## Example: + iex(1)> Message.encode({:hello, []}) + <<0, 27, 177, 1, 161, 138, 117, 115, 101, 114, 95, 97, 103, 101, 110, 116, 140, + 66, 111, 108, 116, 101, 120, 47, 48, 46, 53, 46, 48, 0, 0>> + """ + def encode({:hello, [auth]}) do + params = + auth + |> auth_params() + |> Map.put(:user_agent, @client_name) + + do_encode(:hello, [params]) + end + @doc """ Encode RUN message with its data: statement and parameters @@ -112,6 +148,7 @@ defmodule Boltex.PackStream.Message.Encoder do @spec signature(Boltex.PackStream.Message.out_signature()) :: integer() defp signature(:ack_failure), do: @ack_failure_signature defp signature(:discard_all), do: @discard_all_signature + defp signature(:hello), do: @hello_signature defp signature(:init), do: @init_signature defp signature(:pull_all), do: @pull_all_signature defp signature(:reset), do: @reset_signature diff --git a/test/boltex/bolt_test.exs b/test/boltex/bolt_test.exs index ad40c3e..26d6674 100644 --- a/test/boltex/bolt_test.exs +++ b/test/boltex/bolt_test.exs @@ -87,17 +87,18 @@ defmodule Boltex.BoltTest do end end - test "Temporal / patial types does not work prior to Neo4j 3.4", %{ - port: port, - is_bolt_v2: is_bolt_v2 - } do - test_bolt_v2(port, is_bolt_v2) + test "Temporal / spatial types does not work prior to bolt version 2", + %{ + port: port, + bolt_version: bolt_version + } do + test_bolt_version(port, bolt_version) end @doc """ Test valid returns for Bolt V1. """ - def test_bolt_v2(port, false) do + def test_bolt_version(port, 1) do assert %Boltex.Error{type: :cypher_error} = Bolt.run_statement(:gen_tcp, port, "RETURN date('2018-01-01') as d") @@ -175,7 +176,7 @@ defmodule Boltex.BoltTest do @doc """ Test valid returns for Bolt V2. """ - def test_bolt_v2(port, true) do + def test_bolt_version(port, _) do assert [ success: %{"fields" => ["d"], "result_available_after" => _}, record: [[sig: 68, fields: [17167]]], diff --git a/test/boltex/pack_stream/message/encoder_test.exs b/test/boltex/pack_stream/message/encoder_test.exs index d25f7ff..00b12b0 100644 --- a/test/boltex/pack_stream/message/encoder_test.exs +++ b/test/boltex/pack_stream/message/encoder_test.exs @@ -14,10 +14,16 @@ defmodule Boltex.PackStream.Message.EncoderTest do assert <<0x0, 0x2, 0xB0, 0x2F, 0x0, 0x0>> = Encoder.encode({:discard_all, []}) end + test "hello" do + assert <<0x0, _, 0xB1, 0x1, _::binary>> = Encoder.encode({:hello, []}) + + assert <<0x0, _, 0xB1, 0x1, _::binary>> = Encoder.encode({:hello, [{"neo4j", "password"}]}) + end + test "init" do - assert <<0x0, 0x10, 0xB2, 0x1, _::binary>> = Encoder.encode({:init, []}) + assert <<0x0, _, 0xB2, 0x1, _::binary>> = Encoder.encode({:init, []}) - assert <<0x0, 0x42, 0xB2, 0x1, _::binary>> = Encoder.encode({:init, [{"neo4j", "password"}]}) + assert <<0x0, _, 0xB2, 0x1, _::binary>> = Encoder.encode({:init, [{"neo4j", "password"}]}) end test "pull_all" do diff --git a/test/support/integration_case.ex b/test/support/integration_case.ex index 565f6da..92f2e7a 100644 --- a/test/support/integration_case.ex +++ b/test/support/integration_case.ex @@ -7,23 +7,19 @@ defmodule Boltex.IntegrationCase do uri = neo4j_uri() port_opts = [active: false, mode: :binary, packet: :raw] {:ok, port} = :gen_tcp.connect(uri.host, uri.port, port_opts) - :ok = Bolt.handshake(:gen_tcp, port) + {:ok, version} = Bolt.handshake(:gen_tcp, port) - # Neo4j 3.0 does'nt return serverr info on INIT - is_bolt_v2 = - case Bolt.init(:gen_tcp, port, uri.userinfo) do - {:ok, %{"server" => server}} -> - is_bolt_v2?(server) - - {:ok, %{}} -> - false + result = + cond do + version == 3 -> Bolt.hello(:gen_tcp, port, uri.userinfo) + version < 3 -> Bolt.init(:gen_tcp, port, uri.userinfo) end on_exit(fn -> :gen_tcp.close(port) end) - {:ok, port: port, is_bolt_v2: is_bolt_v2} + {:ok, port: port, bolt_version: version} end def neo4j_uri do @@ -41,16 +37,4 @@ defmodule Boltex.IntegrationCase do |> List.to_tuple() end) end - - defp is_bolt_v2?(server) do - regex = ~r/Neo4j\/(?[\d])\.(?[\d])\.(?[\d])/ - version_info = Regex.named_captures(regex, server) - - if String.to_integer(version_info["major"]) >= 3 and - String.to_integer(version_info["minor"]) >= 4 do - true - else - false - end - end end From a1f57406a455ce28afe1205aee08743e33de34c2 Mon Sep 17 00:00:00 2001 From: Dominique VASSARD Date: Sun, 27 Jan 2019 12:54:00 +0100 Subject: [PATCH 02/12] Implement new RUN format --- lib/boltex/bolt.ex | 42 +++----- lib/boltex/pack_stream/message/encoder.ex | 35 ++++++- lib/boltex/version_agent.ex | 24 +++++ test/boltex/bolt_test.exs | 120 ++++++++++++++-------- 4 files changed, 147 insertions(+), 74 deletions(-) create mode 100644 lib/boltex/version_agent.ex diff --git a/lib/boltex/bolt.ex b/lib/boltex/bolt.ex index 983e046..ab05d0b 100644 --- a/lib/boltex/bolt.ex +++ b/lib/boltex/bolt.ex @@ -127,6 +127,7 @@ defmodule Boltex.Bolt do {:ok, <> = packet} when version <= @max_version -> Boltex.Logger.log_message(:server, :handshake, packet, :hex) Boltex.Logger.log_message(:server, :handshake, version) + Boltex.VersionAgent.start_link(version) {:ok, version} {:ok, other} -> @@ -241,38 +242,23 @@ defmodule Boltex.Bolt do Boltex.PackStream.Message.decoded() ] | Boltex.Error.t() + def run_statement(transport, port, statement, params \\ %{}, options \\ []) do data = [statement, params] - - with :ok <- send_message(transport, port, {:run, data}), - {:success, _} = data <- receive_data(transport, port, options), - :ok <- send_message(transport, port, {:pull_all, []}), - more_data <- receive_data(transport, port, options), - more_data = List.wrap(more_data), - {:success, _} <- List.last(more_data) do - [data | more_data] - else - {:failure, map} -> - Boltex.Error.exception(map, port, :run_statement) - - error = %Boltex.Error{} -> - error - - error -> - Boltex.Error.exception(error, port, :run_statement) - end + do_run_statement(transport, port, data, options) end - def run_statement_with_metadata( - transport, - port, - statement, - params \\ %{}, - metadata \\ %{}, - options \\ [] - ) do + @spec run_statement_with_metadata(atom(), port(), String.t(), map(), map(), Keyword.t()) :: + [ + Boltex.PackStream.Message.decoded() + ] + | Boltex.Error.t() + def run_statement_with_metadata(transport, port, statement, params, metadata, options \\ []) do data = [statement, params, metadata] + do_run_statement(transport, port, data, options) + end + defp do_run_statement(transport, port, data, options) do with :ok <- send_message(transport, port, {:run, data}), {:success, _} = data <- receive_data(transport, port, options), :ok <- send_message(transport, port, {:pull_all, []}), @@ -361,8 +347,7 @@ defmodule Boltex.Bolt do @spec receive_data(atom(), port(), Keyword.t(), list()) :: {atom(), Boltex.PackStream.value()} | {:error, any()} def receive_data(transport, port, options \\ [], previous \\ []) do - with {:ok, data} <- do_receive_data(transport, port, options), - _ <- IO.puts(inspect(data)) do + with {:ok, data} <- do_receive_data(transport, port, options) do case Message.decode(data) do {:record, _} = data -> receive_data(transport, port, options, [data | previous]) @@ -390,7 +375,6 @@ defmodule Boltex.Bolt do recv_timeout = get_recv_timeout(options) rec = transport.recv(port, 2, recv_timeout) - IO.puts(inspect(rec)) case rec do {:ok, <>} -> diff --git a/lib/boltex/pack_stream/message/encoder.ex b/lib/boltex/pack_stream/message/encoder.ex index 46d0866..09166a7 100644 --- a/lib/boltex/pack_stream/message/encoder.ex +++ b/lib/boltex/pack_stream/message/encoder.ex @@ -89,6 +89,15 @@ defmodule Boltex.PackStream.Message.Encoder do @doc """ Encode RUN message with its data: statement and parameters + RUN takes only 2 parameters in V2: + - the statement to execute + - the statement binds + + RUN takes 3 parameters in V3: + - the statement to execute + - the statement binds + - metadata + ## Example iex> Message.encode({:run, ["RETURN 1 AS num"]}) <<0, 19, 178, 16, 143, 82, 69, 84, 85, 82, 78, 32, 49, 32, 65, 83, 32, 110, 117, @@ -99,8 +108,12 @@ defmodule Boltex.PackStream.Message.Encoder do """ - def encode({:run, [statement]}) do - do_encode(:run, [statement, %{}]) + def encode({:run, [_statement]} = message) do + encode_run(message, Boltex.VersionAgent.get()) + end + + def encode({:run, [_statment, _params]} = message) do + encode_run(message, Boltex.VersionAgent.get()) end @doc """ @@ -120,6 +133,24 @@ defmodule Boltex.PackStream.Message.Encoder do do_encode(message_type, data) end + @spec encode_run({Boltex.PackStream.Message.out_signature(), list()}, integer()) :: + Boltex.PackStream.Message.encoded() + defp encode_run({:run, [statement]}, bolt_version) when bolt_version <= 2 do + do_encode(:run, [statement, %{}]) + end + + defp encode_run({:run, [statement]}, _) do + do_encode(:run, [statement, %{}, %{}]) + end + + defp encode_run({:run, [statement, params]}, bolt_version) when bolt_version <= 2 do + do_encode(:run, [statement, params]) + end + + defp encode_run({:run, [statement, params]}, _) do + do_encode(:run, [statement, params, %{}]) + end + @spec do_encode(Boltex.PackStream.Message.out_signature(), list()) :: Boltex.PackStream.Message.encoded() defp do_encode(message_type, data) do diff --git a/lib/boltex/version_agent.ex b/lib/boltex/version_agent.ex new file mode 100644 index 0000000..1b9af66 --- /dev/null +++ b/lib/boltex/version_agent.ex @@ -0,0 +1,24 @@ +defmodule Boltex.VersionAgent do + @moduledoc """ + Since Bolt v3, `RUN` gets an additional parameter. + Boltex user shouldn't be forced to store and pass protocol version to function, + therefore we store it here + """ + use Agent + + @doc """ + Start the agent with current protocol version + """ + @spec start_link(integer()) :: {:error, any()} | {:ok, pid()} + def start_link(version) do + Agent.start_link(fn -> version end, name: __MODULE__) + end + + @doc """ + Return the current protocol version + """ + @spec get() :: integer() + def get() do + Agent.get(__MODULE__, & &1) + end +end diff --git a/test/boltex/bolt_test.exs b/test/boltex/bolt_test.exs index 26d6674..d84cf6c 100644 --- a/test/boltex/bolt_test.exs +++ b/test/boltex/bolt_test.exs @@ -34,13 +34,21 @@ defmodule Boltex.BoltTest do assert %Boltex.Error{type: :cypher_error} = Bolt.run_statement(:gen_tcp, port, "What?") end - test "allows to recover from error with ack_failure", %{port: port} do - assert %Boltex.Error{type: :cypher_error} = Bolt.run_statement(:gen_tcp, port, "What?") - assert :ok = Bolt.ack_failure(:gen_tcp, port) - assert [{:success, _} | _] = Bolt.run_statement(:gen_tcp, port, "RETURN 1 as num") + test "allows to recover from error with ack_failure for bolt v1 & v2", %{ + port: port, + bolt_version: bolt_version + } do + if bolt_version <= 2 do + assert %Boltex.Error{type: :cypher_error} = Bolt.run_statement(:gen_tcp, port, "What?") + assert :ok = Bolt.ack_failure(:gen_tcp, port) + assert [{:success, _} | _] = Bolt.run_statement(:gen_tcp, port, "RETURN 1 as num") + end end - test "allows to recover from error with reset", %{port: port} do + # RESET doesn't exists in Bolt V3! + test "allows to recover from error with reset", %{ + port: port + } do assert %Boltex.Error{type: :cypher_error} = Bolt.run_statement(:gen_tcp, port, "What?") assert :ok = Bolt.reset(:gen_tcp, port) assert [{:success, _} | _] = Bolt.run_statement(:gen_tcp, port, "RETURN 1 as num") @@ -67,18 +75,6 @@ defmodule Boltex.BoltTest do Bolt.run_statement(:gen_tcp, port, "RETURN 1 as num") end - test "works within a transaction", %{port: port} do - assert [{:success, _}, {:success, _}] = Bolt.run_statement(:gen_tcp, port, "BEGIN") - assert [{:success, _} | _] = Bolt.run_statement(:gen_tcp, port, "RETURN 1 as num") - assert [{:success, _}, {:success, _}] = Bolt.run_statement(:gen_tcp, port, "COMMIT") - end - - test "works with rolled-back transactions", %{port: port} do - assert [{:success, _}, {:success, _}] = Bolt.run_statement(:gen_tcp, port, "BEGIN") - assert [{:success, _} | _] = Bolt.run_statement(:gen_tcp, port, "RETURN 1 as num") - assert [{:success, _}, {:success, _}] = Bolt.run_statement(:gen_tcp, port, "ROLLBACK") - end - test "an invalid parameter value yields an error", %{port: port} do cypher = "MATCH (n:Person {invalid: {an_elixir_datetime}}) RETURN TRUE" @@ -87,18 +83,54 @@ defmodule Boltex.BoltTest do end end + test "RUN with metadata (Bolt >= 3)", %{port: port, bolt_version: bolt_version} do + if bolt_version >= 3 do + assert [{:success, _}, {:record, _}, {:success, _}] = + Bolt.run_statement_with_metadata(:gen_tcp, port, "RETURN 1 AS num", %{}, %{}) + + metadata = %{ + tx_timeout: 1000, + bookmarks: ["neo4j:bookmark:v1:tx16732"], + tx_metadata: %{ + name: "my_tx" + } + } + + assert [{:success, _}, {:record, _}, {:success, _}] = + Bolt.run_statement_with_metadata(:gen_tcp, port, "RETURN 1 AS num", %{}, metadata) + end + end + + test "Transactions work differently in v3", %{port: _port, bolt_version: _bolt_version} do + end + test "Temporal / spatial types does not work prior to bolt version 2", %{ port: port, bolt_version: bolt_version } do - test_bolt_version(port, bolt_version) + test_spatial_and_temporal(port, bolt_version) + end + + def test_transactions(port, bolt_version) when bolt_version <= 2 do + # Works within a transaction + assert [{:success, _}, {:success, _}] = Bolt.run_statement(:gen_tcp, port, "BEGIN") + assert [{:success, _} | _] = Bolt.run_statement(:gen_tcp, port, "RETURN 1 as num") + assert [{:success, _}, {:success, _}] = Bolt.run_statement(:gen_tcp, port, "COMMIT") + + # works with rolled-back transactions + assert [{:success, _}, {:success, _}] = Bolt.run_statement(:gen_tcp, port, "BEGIN") + assert [{:success, _} | _] = Bolt.run_statement(:gen_tcp, port, "RETURN 1 as num") + assert [{:success, _}, {:success, _}] = Bolt.run_statement(:gen_tcp, port, "ROLLBACK") + end + + def test_transactions(_port, _) do end @doc """ Test valid returns for Bolt V1. """ - def test_bolt_version(port, 1) do + def test_spatial_and_temporal(port, 1) do assert %Boltex.Error{type: :cypher_error} = Bolt.run_statement(:gen_tcp, port, "RETURN date('2018-01-01') as d") @@ -174,37 +206,39 @@ defmodule Boltex.BoltTest do end @doc """ - Test valid returns for Bolt V2. + Test valid returns for Bolt V2 & V3. """ - def test_bolt_version(port, _) do + def test_spatial_and_temporal(port, _) do assert [ - success: %{"fields" => ["d"], "result_available_after" => _}, + # success: %{"fields" => ["d"], "result_available_after" => _}, + success: %{"fields" => ["d"]}, record: [[sig: 68, fields: [17167]]], - success: %{"result_consumed_after" => _, "type" => "r"} + # success: %{"result_consumed_after" => _, "type" => "r"} + success: %{"type" => "r"} ] = Bolt.run_statement(:gen_tcp, port, "RETURN date('2017-01-01') as d") assert [ - success: %{"fields" => ["t"], "result_available_after" => _}, + success: %{"fields" => ["t"]}, record: [[sig: 84, fields: [45_930_250_000_000, 3600]]], - success: %{"result_consumed_after" => _, "type" => "r"} + success: %{"type" => "r"} ] = Bolt.run_statement(:gen_tcp, port, "RETURN time('12:45:30.25+01:00') AS t") assert [ - success: %{"fields" => ["t"], "result_available_after" => _}, + success: %{"fields" => ["t"]}, record: [[sig: 116, fields: [45_930_250_000_000]]], - success: %{"result_consumed_after" => _, "type" => "r"} + success: %{"type" => "r"} ] = Bolt.run_statement(:gen_tcp, port, "RETURN localtime('12:45:30.25') AS t") assert [ - success: %{"fields" => ["d"], "result_available_after" => _}, + success: %{"fields" => ["d"]}, record: [[sig: 69, fields: [15, 34, 54, 5550]]], - success: %{"result_consumed_after" => _, "type" => "r"} + success: %{"type" => "r"} ] = Bolt.run_statement(:gen_tcp, port, "RETURN duration('P1Y3M34DT54.00000555S') AS d") assert [ - success: %{"fields" => ["d"], "result_available_after" => _}, + success: %{"fields" => ["d"]}, record: [[sig: 100, fields: [1_522_931_640, 543_000_000]]], - success: %{"result_consumed_after" => _, "type" => "r"} + success: %{"type" => "r"} ] = Bolt.run_statement( :gen_tcp, @@ -213,9 +247,9 @@ defmodule Boltex.BoltTest do ) assert [ - success: %{"fields" => ["d"], "result_available_after" => _}, + success: %{"fields" => ["d"]}, record: [[sig: 70, fields: [1_522_931_663, 543_000_000, 3600]]], - success: %{"result_consumed_after" => _, "type" => "r"} + success: %{"type" => "r"} ] = Bolt.run_statement( :gen_tcp, @@ -224,9 +258,9 @@ defmodule Boltex.BoltTest do ) assert [ - success: %{"fields" => ["d"], "result_available_after" => _}, + success: %{"fields" => ["d"]}, record: [[sig: 102, fields: [1_522_931_663, 543_000_000, "Europe/Berlin"]]], - success: %{"result_consumed_after" => _, "type" => "r"} + success: %{"type" => "r"} ] = Bolt.run_statement( :gen_tcp, @@ -235,15 +269,15 @@ defmodule Boltex.BoltTest do ) assert [ - success: %{"fields" => ["p"], "result_available_after" => _}, + success: %{"fields" => ["p"]}, record: [[sig: 88, fields: [7203, 40.0, 45.0]]], - success: %{"result_consumed_after" => _, "type" => "r"} + success: %{"type" => "r"} ] = Bolt.run_statement(:gen_tcp, port, "RETURN point({x: 40, y: 45}) AS p") assert [ - success: %{"fields" => ["p"], "result_available_after" => _}, + success: %{"fields" => ["p"]}, record: [[sig: 88, fields: [4326, 40.0, 45.0]]], - success: %{"result_consumed_after" => _, "type" => "r"} + success: %{"type" => "r"} ] = Bolt.run_statement( :gen_tcp, @@ -252,15 +286,15 @@ defmodule Boltex.BoltTest do ) assert [ - success: %{"fields" => ["p"], "result_available_after" => _}, + success: %{"fields" => ["p"]}, record: [[sig: 89, fields: [9157, 40.0, 45.0, 150.0]]], - success: %{"result_consumed_after" => _, "type" => "r"} + success: %{"type" => "r"} ] = Bolt.run_statement(:gen_tcp, port, "RETURN point({x: 40, y: 45, z: 150}) AS p") assert [ - success: %{"fields" => ["p"], "result_available_after" => _}, + success: %{"fields" => ["p"]}, record: [[sig: 89, fields: [4979, 40.0, 45.0, 150.0]]], - success: %{"result_consumed_after" => _, "type" => "r"} + success: %{"type" => "r"} ] = Bolt.run_statement( :gen_tcp, From badc2c86911144936b7a165d20f87e5084f65e84 Mon Sep 17 00:00:00 2001 From: Dominique VASSARD Date: Sun, 27 Jan 2019 19:50:38 +0100 Subject: [PATCH 03/12] Implement new transaction message: BEGIN, COMMIT, ROLLBACK + Add Metadata structure --- lib/boltex/bolt.ex | 76 +++++++++++++++++-- lib/boltex/metadata.ex | 68 +++++++++++++++++ lib/boltex/pack_stream/message.ex | 12 ++- lib/boltex/pack_stream/message/encoder.ex | 24 +++++- test/boltex/bolt_test.exs | 23 +++++- test/boltex/metadata_test.exs | 69 +++++++++++++++++ .../pack_stream/message/encoder_test.exs | 52 ++++++++++--- test/boltex/pack_stream/message_test.exs | 13 +++- test/support/encoder_case.ex | 20 +++++ test/support/integration_case.ex | 28 ++----- test/support/uri_helper.ex | 21 +++++ 11 files changed, 360 insertions(+), 46 deletions(-) create mode 100644 lib/boltex/metadata.ex create mode 100644 test/boltex/metadata_test.exs create mode 100644 test/support/encoder_case.ex create mode 100644 test/support/uri_helper.ex diff --git a/lib/boltex/bolt.ex b/lib/boltex/bolt.ex index ab05d0b..d13e035 100644 --- a/lib/boltex/bolt.ex +++ b/lib/boltex/bolt.ex @@ -290,12 +290,7 @@ defmodule Boltex.Bolt do """ @spec ack_failure(atom(), port(), Keyword.t()) :: :ok | Boltex.Error.t() def ack_failure(transport, port, options \\ []) do - send_message(transport, port, {:ack_failure, []}) - - case receive_data(transport, port, options) do - {:success, %{}} -> :ok - error -> Boltex.Error.exception(error, port, :ack_failure) - end + treat_simple_message(:ack_failure, transport, port, options) end @doc """ @@ -310,11 +305,76 @@ defmodule Boltex.Bolt do """ @spec reset(atom(), port(), Keyword.t()) :: :ok | Boltex.Error.t() def reset(transport, port, options \\ []) do - send_message(transport, port, {:reset, []}) + treat_simple_message(:reset, transport, port, options) + end + + @doc """ + Implementation of Bolt's BEGIN message. It opens a transaction. + + ## Options + + See "Shared options" in the documentation of this module. + """ + @spec begin(atom(), port(), map(), Keyword.t()) :: :ok | Boltex.Error.t() + def begin(transport, port, metadata \\ %{}, options \\ []) do + with {:ok, begin_metadata} <- Boltex.Metadata.new(metadata) do + send_message( + transport, + port, + {:begin, [begin_metadata]} + ) + + case receive_data(transport, port, options) do + {:success, %{}} -> :ok + error -> Boltex.Error.exception(error, port, :begin) + end + else + {:error, error} -> Boltex.Error.exception(error, port, :begin) + end + end + + @doc """ + Implementation of Bolt's COMMIT message. It commits an opened transaction. + + ## Options + + See "Shared options" in the documentation of this module. + """ + @spec commit(atom(), port(), Keyword.t()) :: :ok | Boltex.Error.t() + def commit(transport, port, options \\ []) do + send_message(transport, port, {:commit, []}) + + case receive_data(transport, port, options) do + {:success, metadata} -> {:success, metadata} + error -> Boltex.Error.exception(error, port, :commit) + end + end + + @doc """ + Implementation of Bolt's ROLLBACK message. It rollbacks an opened transaction. + + ## Options + + See "Shared options" in the documentation of this module. + """ + @spec rollback(atom(), port(), Keyword.t()) :: :ok | Boltex.Error.t() + def rollback(transport, port, options \\ []) do + treat_simple_message(:rollback, transport, port, options) + end + + # Manage the sending and receiving of message: + # - without parameters + # - with only SUCCESS {} as result + # + # there fore, only `:ok` is sent back as a valid result + @spec treat_simple_message(Message.out_signature(), atom(), port(), Keyword.t()) :: + :ok | Boltex.Error.t() + defp treat_simple_message(signature, transport, port, options) do + send_message(transport, port, {signature, []}) case receive_data(transport, port, options) do {:success, %{}} -> :ok - error -> Boltex.Error.exception(error, port, :reset) + error -> Boltex.Error.exception(error, port, signature) end end diff --git a/lib/boltex/metadata.ex b/lib/boltex/metadata.ex new file mode 100644 index 0000000..f1a58ee --- /dev/null +++ b/lib/boltex/metadata.ex @@ -0,0 +1,68 @@ +defmodule Boltex.Metadata do + defstruct [:bookmarks, :tx_timeout, :metadata] + + @type t :: %__MODULE__{ + bookmarks: [String.t()], + tx_timeout: non_neg_integer(), + metadata: map() + } + + @spec new(map()) :: {:ok, Boltex.Metadata.t()} | {:error, String.t()} + def new(data) do + with {:ok, bookmarks} <- validate_bookmarks(Map.get(data, :bookmarks, [])), + {:ok, tx_timeout} <- validate_timeout(Map.get(data, :tx_timeout)), + {:ok, metadata} <- validate_metadata(Map.get(data, :metadata, %{})) do + {:ok, + %__MODULE__{ + bookmarks: bookmarks, + tx_timeout: tx_timeout, + metadata: metadata + }} + else + error -> error + end + end + + @spec to_map(Boltex.Metadata.t()) :: map() + def to_map(metadata) do + metadata + |> Map.from_struct() + |> Enum.filter(fn {_, value} -> value != nil end) + |> Enum.into(%{}) + end + + @spec validate_bookmarks(any()) :: {:ok, list()} | {:ok, nil} | {:error, String.t()} + defp validate_bookmarks(bookmarks) when is_list(bookmarks) and length(bookmarks) > 0 do + {:ok, bookmarks} + end + + defp validate_bookmarks([]) do + {:ok, nil} + end + + defp validate_bookmarks(_) do + {:error, "Invalid bookmkarks. Should be a list."} + end + + @spec validate_timeout(any()) :: {:ok, integer()} | {:error, String.t()} + defp validate_timeout(timeout) when (is_integer(timeout) and timeout > 0) or is_nil(timeout) do + {:ok, timeout} + end + + defp validate_timeout(_) do + {:error, "Invalid timeout. Should be a positive integer."} + end + + @spec validate_metadata(any()) :: {:ok, map()} | {:ok, nil} | {:error, String.t()} + defp validate_metadata(metadata) when is_map(metadata) and map_size(metadata) > 0 do + {:ok, metadata} + end + + defp validate_metadata(%{}) do + {:ok, nil} + end + + defp validate_metadata(_) do + {:error, "Invalid timeout. Should be a positive integer."} + end +end diff --git a/lib/boltex/pack_stream/message.ex b/lib/boltex/pack_stream/message.ex index d2ae662..0d5eb21 100644 --- a/lib/boltex/pack_stream/message.ex +++ b/lib/boltex/pack_stream/message.ex @@ -10,7 +10,17 @@ defmodule Boltex.PackStream.Message do alias Boltex.PackStream.Message.Decoder @type in_signature :: :failure | :ignored | :record | :success - @type out_signature :: :ack_failure | :discard_all | :hello | :init | :pull_all | :reset | :run + @type out_signature :: + :ack_failure + | :begin + | :commit + | :discard_all + | :hello + | :init + | :pull_all + | :reset + | :rollback + | :run @type raw :: {out_signature, list()} @type decoded :: {in_signature(), any()} @type encoded :: <<_::16, _::_*8>> diff --git a/lib/boltex/pack_stream/message/encoder.ex b/lib/boltex/pack_stream/message/encoder.ex index 09166a7..52206c7 100644 --- a/lib/boltex/pack_stream/message/encoder.ex +++ b/lib/boltex/pack_stream/message/encoder.ex @@ -15,11 +15,14 @@ defmodule Boltex.PackStream.Message.Encoder do @end_marker <<0x00, 0x00>> @ack_failure_signature 0x0E + @begin_signature 0x11 + @commit_signature 0x12 @discard_all_signature 0x2F @hello_signature 0x01 @init_signature 0x01 @pull_all_signature 0x3F @reset_signature 0x0F + @rollback_signature 0x13 @run_signature 0x10 @doc """ @@ -73,7 +76,7 @@ defmodule Boltex.PackStream.Message.Encoder do HELLO message is similar to INIT but is used by Bolt protocol v3 and higher ## Example: - iex(1)> Message.encode({:hello, []}) + iex> Message.encode({:hello, []}) <<0, 27, 177, 1, 161, 138, 117, 115, 101, 114, 95, 97, 103, 101, 110, 116, 140, 66, 111, 108, 116, 101, 120, 47, 48, 46, 53, 46, 48, 0, 0>> """ @@ -86,6 +89,22 @@ defmodule Boltex.PackStream.Message.Encoder do do_encode(:hello, [params]) end + @doc """ + Encode BEGIN message without metadata. + + COMMIT is used to open a transaction + """ + def encode({:begin, []}) do + encode({:begin, [%{}]}) + end + + @doc """ + Encode BEGIN message with metadata + """ + def encode({:begin, [%Boltex.Metadata{} = metadata]}) do + encode({:begin, [Boltex.Metadata.to_map(metadata)]}) + end + @doc """ Encode RUN message with its data: statement and parameters @@ -178,11 +197,14 @@ defmodule Boltex.PackStream.Message.Encoder do @spec signature(Boltex.PackStream.Message.out_signature()) :: integer() defp signature(:ack_failure), do: @ack_failure_signature + defp signature(:begin), do: @begin_signature + defp signature(:commit), do: @commit_signature defp signature(:discard_all), do: @discard_all_signature defp signature(:hello), do: @hello_signature defp signature(:init), do: @init_signature defp signature(:pull_all), do: @pull_all_signature defp signature(:reset), do: @reset_signature + defp signature(:rollback), do: @rollback_signature defp signature(:run), do: @run_signature @spec generate_chunks(Boltex.PackStream.value() | <<>>, list()) :: diff --git a/test/boltex/bolt_test.exs b/test/boltex/bolt_test.exs index d84cf6c..c9e1472 100644 --- a/test/boltex/bolt_test.exs +++ b/test/boltex/bolt_test.exs @@ -34,6 +34,7 @@ defmodule Boltex.BoltTest do assert %Boltex.Error{type: :cypher_error} = Bolt.run_statement(:gen_tcp, port, "What?") end + # ACK_FAILURE doesn't exists in Bolt V3! test "allows to recover from error with ack_failure for bolt v1 & v2", %{ port: port, bolt_version: bolt_version @@ -45,7 +46,6 @@ defmodule Boltex.BoltTest do end end - # RESET doesn't exists in Bolt V3! test "allows to recover from error with reset", %{ port: port } do @@ -101,7 +101,8 @@ defmodule Boltex.BoltTest do end end - test "Transactions work differently in v3", %{port: _port, bolt_version: _bolt_version} do + test "Transactions work differently in v3", %{port: port, bolt_version: bolt_version} do + test_transactions(port, bolt_version) end test "Temporal / spatial types does not work prior to bolt version 2", @@ -124,7 +125,23 @@ defmodule Boltex.BoltTest do assert [{:success, _}, {:success, _}] = Bolt.run_statement(:gen_tcp, port, "ROLLBACK") end - def test_transactions(_port, _) do + def test_transactions(port, _) do + # Simple transaction + assert :ok = Bolt.begin(:gen_tcp, port) + assert [{:success, _} | _] = Bolt.run_statement(:gen_tcp, port, "RETURN 1 as num") + assert {:success, %{"bookmark" => _}} = Bolt.commit(:gen_tcp, port) + + # Transaction with metadata + assert :ok = Bolt.begin(:gen_tcp, port, %{bookmarks: ["neo4j:bookmark:v1:tx111"]}) + + assert [{:success, _} | _] = Bolt.run_statement(:gen_tcp, port, "RETURN 1 as num") + assert {:success, %{"bookmark" => _}} = Bolt.commit(:gen_tcp, port) + + # Rollback transaction + assert :ok = Bolt.begin(:gen_tcp, port, %{bookmarks: ["neo4j:bookmark:v1:tx111"]}) + + assert [{:success, _} | _] = Bolt.run_statement(:gen_tcp, port, "RETURN 1 as num") + assert :ok = Bolt.rollback(:gen_tcp, port) end @doc """ diff --git a/test/boltex/metadata_test.exs b/test/boltex/metadata_test.exs new file mode 100644 index 0000000..d671eff --- /dev/null +++ b/test/boltex/metadata_test.exs @@ -0,0 +1,69 @@ +defmodule Boltex.MetadataTest do + use ExUnit.Case, async: true + alias Boltex.Metadata + + @valid_metadata %{ + bookmarks: ["neo4j:bookmark:v1:tx1111"], + tx_timeout: 5000, + metadata: %{ + desc: "Not lost in transaction" + } + } + + describe "Create new metadata from map:" do + test "with compelete data" do + expected = %Metadata{ + bookmarks: ["neo4j:bookmark:v1:tx1111"], + tx_timeout: 5000, + metadata: %{ + desc: "Not lost in transaction" + } + } + + assert {:ok, result} = Metadata.new(@valid_metadata) + + assert expected == result + end + + test "return error with invalid bookmarks" do + data = Map.put(@valid_metadata, :bookmarks, "invalid") + assert {:error, _} = Metadata.new(data) + end + + test "return nil with empty bookmarks list" do + data = Map.put(@valid_metadata, :bookmarks, []) + + expected = %Metadata{ + bookmarks: nil, + tx_timeout: data.tx_timeout, + metadata: data.metadata + } + + assert {:ok, result} = Metadata.new(data) + assert expected == result + end + + test "return error with invalid timeout" do + data = Map.put(@valid_metadata, :tx_timeout, -12) + assert {:error, _} = Metadata.new(data) + end + + test "return error with invalid metadata" do + data = Map.put(@valid_metadata, :metadata, "invalid") + assert {:error, _} = Metadata.new(data) + end + + test "return nil with empty metadata map" do + data = Map.put(@valid_metadata, :metadata, %{}) + + expected = %Metadata{ + bookmarks: data.bookmarks, + tx_timeout: data.tx_timeout, + metadata: nil + } + + assert {:ok, result} = Metadata.new(data) + assert expected == result + end + end +end diff --git a/test/boltex/pack_stream/message/encoder_test.exs b/test/boltex/pack_stream/message/encoder_test.exs index 00b12b0..9d3fd8f 100644 --- a/test/boltex/pack_stream/message/encoder_test.exs +++ b/test/boltex/pack_stream/message/encoder_test.exs @@ -1,5 +1,6 @@ defmodule Boltex.PackStream.Message.EncoderTest do - use ExUnit.Case, async: true + # use ExUnit.Case, async: true + use Boltex.EncoderCase, async: true alias Boltex.PackStream.Message.Encoder defmodule TestUser do @@ -10,6 +11,18 @@ defmodule Boltex.PackStream.Message.EncoderTest do assert <<0x0, 0x2, 0xB0, 0xE, 0x0, 0x0>> = Encoder.encode({:ack_failure, []}) end + test "begin" do + assert <<0x0, 0x3, 0xB1, 0x11, 0xA0, 0x0, 0x0>> = Encoder.encode({:begin, []}) + + assert <<0x0, 0x11, 0xB1, 0x11, 0xA1, 0x8A, 0x74, 0x78, 0x5F, 0x74, 0x69, 0x6D, 0x65, 0x6F, + 0x75, 0x74, 0xC9, 0x13, 0x88, 0x0, + 0x0>> = Encoder.encode({:begin, [%{tx_timeout: 5000}]}) + end + + test "commit" do + assert <<0x0, 0x2, 0xB0, 0x12, 0x0, 0x0>> = Encoder.encode({:commit, []}) + end + test "discard_all" do assert <<0x0, 0x2, 0xB0, 0x2F, 0x0, 0x0>> = Encoder.encode({:discard_all, []}) end @@ -34,15 +47,36 @@ defmodule Boltex.PackStream.Message.EncoderTest do assert <<0x0, 0x2, 0xB0, 0xF, 0x0, 0x0>> = Encoder.encode({:reset, []}) end - test "run" do - assert <<0x0, 0x13, 0xB2, 0x10, 0x8F, 0x52, 0x45, 0x54, 0x55, 0x52, 0x4E, 0x20, 0x31, 0x20, - 0x41, 0x53, 0x20, 0x6E, 0x75, 0x6D, 0xA0, 0x0, - 0x0>> = Encoder.encode({:run, ["RETURN 1 AS num"]}) + test "rollback" do + assert <<0x0, 0x2, 0xB0, 0x13, 0x0, 0x0>> = Encoder.encode({:rollback, []}) + end + + test "run", %{bolt_version: bolt_version} do + assert result_run_without_params(bolt_version) == Encoder.encode({:run, ["RETURN 1 AS num"]}) + + assert result_run_with_params(bolt_version) == + Encoder.encode({:run, ["RETURN {num} AS num", %{num: 5}]}) + end + + defp result_run_without_params(bolt_version) when bolt_version <= 2 do + <<0x0, 0x13, 0xB2, 0x10, 0x8F, 0x52, 0x45, 0x54, 0x55, 0x52, 0x4E, 0x20, 0x31, 0x20, 0x41, + 0x53, 0x20, 0x6E, 0x75, 0x6D, 0xA0, 0x0, 0x0>> + end + + defp result_run_without_params(_bolt_version) do + <<0x0, 0x14, 0xB3, 0x10, 0x8F, 0x52, 0x45, 0x54, 0x55, 0x52, 0x4E, 0x20, 0x31, 0x20, 0x41, + 0x53, 0x20, 0x6E, 0x75, 0x6D, 0xA0, 0xA0, 0x0, 0x0>> + end + + defp result_run_with_params(bolt_version) when bolt_version <= 2 do + <<0x0, 0x13, 0xB2, 0x10, 0x8F, 0x52, 0x45, 0x54, 0x55, 0x52, 0x4E, 0x20, 0x31, 0x20, 0x41, + 0x53, 0x20, 0x6E, 0x75, 0x6D, 0xA0, 0x0, 0x0>> + end - assert <<0x0, 0x1D, 0xB2, 0x10, 0xD0, 0x13, 0x52, 0x45, 0x54, 0x55, 0x52, 0x4E, 0x20, 0x7B, - 0x6E, 0x75, 0x6D, 0x7D, 0x20, 0x41, 0x53, 0x20, 0x6E, 0x75, 0x6D, 0xA1, 0x83, 0x6E, - 0x75, 0x6D, 0x5, 0x0, - 0x0>> = Encoder.encode({:run, ["RETURN {num} AS num", %{num: 5}]}) + defp result_run_with_params(_bolt_version) do + <<0x0, 0x1E, 0xB3, 0x10, 0xD0, 0x13, 0x52, 0x45, 0x54, 0x55, 0x52, 0x4E, 0x20, 0x7B, 0x6E, + 0x75, 0x6D, 0x7D, 0x20, 0x41, 0x53, 0x20, 0x6E, 0x75, 0x6D, 0xA1, 0x83, 0x6E, 0x75, 0x6D, + 0x5, 0xA0, 0x0, 0x0>> end test "bug fix: encoding struct fails" do diff --git a/test/boltex/pack_stream/message_test.exs b/test/boltex/pack_stream/message_test.exs index 3421fd3..7db5495 100644 --- a/test/boltex/pack_stream/message_test.exs +++ b/test/boltex/pack_stream/message_test.exs @@ -1,9 +1,9 @@ defmodule Boltex.PackStream.MessageTest do - use ExUnit.Case, async: true + use Boltex.IntegrationCase, async: true alias Boltex.PackStream.Message - test "Encode messages" do + test "Encode messages", %{bolt_version: bolt_version} do assert <<_::binary>> = Message.encode({:ack_failure, []}) assert <<_::binary>> = Message.encode({:discard_all, []}) assert <<_::binary>> = Message.encode({:init, []}) @@ -12,6 +12,15 @@ defmodule Boltex.PackStream.MessageTest do assert <<_::binary>> = Message.encode({:reset, []}) assert <<_::binary>> = Message.encode({:run, ["RETURN 1 AS num"]}) assert <<_::binary>> = Message.encode({:run, ["RETURN {num} AS num", %{num: 5}]}) + + if bolt_version >= 3 do + assert <<_::binary>> = + Message.encode({:run, ["RETURN {num} AS num", %{num: 5}, %{tx_timeou: 5000}]}) + + assert <<_::binary>> = Message.encode({:begin, []}) + assert <<_::binary>> = Message.encode({:commit, [%{tx_timeout: 5000}]}) + assert <<_::binary>> = Message.encode({:rollback, []}) + end end test "Decodes message" do diff --git a/test/support/encoder_case.ex b/test/support/encoder_case.ex new file mode 100644 index 0000000..8450443 --- /dev/null +++ b/test/support/encoder_case.ex @@ -0,0 +1,20 @@ +defmodule Boltex.EncoderCase do + use ExUnit.CaseTemplate + + alias Boltex.Bolt + alias Boltex.UriHelper + + setup do + # Some encoding function required to know anout the ptoocol version used, + # which is stored in the VersionAgent + # The following code just open a connection to know which protocol version + # to use and then close it + uri = UriHelper.get_info() + port_opts = [active: false, mode: :binary, packet: :raw] + {:ok, port} = :gen_tcp.connect(uri.host, uri.port, port_opts) + {:ok, bolt_version} = Bolt.handshake(:gen_tcp, port) + :gen_tcp.close(port) + + {:ok, port: port, bolt_version: bolt_version} + end +end diff --git a/test/support/integration_case.ex b/test/support/integration_case.ex index 92f2e7a..743678c 100644 --- a/test/support/integration_case.ex +++ b/test/support/integration_case.ex @@ -2,18 +2,18 @@ defmodule Boltex.IntegrationCase do use ExUnit.CaseTemplate alias Boltex.Bolt + alias Boltex.UriHelper setup do - uri = neo4j_uri() + uri = UriHelper.get_info() port_opts = [active: false, mode: :binary, packet: :raw] {:ok, port} = :gen_tcp.connect(uri.host, uri.port, port_opts) {:ok, version} = Bolt.handshake(:gen_tcp, port) - result = - cond do - version == 3 -> Bolt.hello(:gen_tcp, port, uri.userinfo) - version < 3 -> Bolt.init(:gen_tcp, port, uri.userinfo) - end + cond do + version == 3 -> Bolt.hello(:gen_tcp, port, uri.userinfo) + version < 3 -> Bolt.init(:gen_tcp, port, uri.userinfo) + end on_exit(fn -> :gen_tcp.close(port) @@ -21,20 +21,4 @@ defmodule Boltex.IntegrationCase do {:ok, port: port, bolt_version: version} end - - def neo4j_uri do - "bolt://neo4j:password@localhost:7687" - |> URI.merge(System.get_env("NEO4J_TEST_URL") || "") - |> URI.parse() - |> Map.update!(:host, &String.to_charlist/1) - |> Map.update!(:userinfo, fn - nil -> - {} - - userinfo -> - userinfo - |> String.split(":") - |> List.to_tuple() - end) - end end diff --git a/test/support/uri_helper.ex b/test/support/uri_helper.ex new file mode 100644 index 0000000..ab2c706 --- /dev/null +++ b/test/support/uri_helper.ex @@ -0,0 +1,21 @@ +defmodule Boltex.UriHelper do + @doc """ + Return connection info extracted from uri + """ + @spec get_info() :: map() + def get_info do + "bolt://neo4j:password@localhost:7687" + |> URI.merge(System.get_env("NEO4J_TEST_URL") || "") + |> URI.parse() + |> Map.update!(:host, &String.to_charlist/1) + |> Map.update!(:userinfo, fn + nil -> + {} + + userinfo -> + userinfo + |> String.split(":") + |> List.to_tuple() + end) + end +end From 340001cdbec768d8d00ebab5b792e140209675aa Mon Sep 17 00:00:00 2001 From: Dominique VASSARD Date: Sun, 27 Jan 2019 20:20:52 +0100 Subject: [PATCH 04/12] Implement GOODBYE message --- lib/boltex/bolt.ex | 21 +++++++++++++++++++ lib/boltex/pack_stream/message.ex | 1 + lib/boltex/pack_stream/message/encoder.ex | 2 ++ test/boltex/bolt_test.exs | 6 ++++++ .../pack_stream/message/encoder_test.exs | 4 ++++ test/boltex/pack_stream/message_test.exs | 4 +++- 6 files changed, 37 insertions(+), 1 deletion(-) diff --git a/lib/boltex/bolt.ex b/lib/boltex/bolt.ex index d13e035..b2cbf1d 100644 --- a/lib/boltex/bolt.ex +++ b/lib/boltex/bolt.ex @@ -206,6 +206,27 @@ defmodule Boltex.Bolt do end end + @doc """ + Implementation of Bolt's GOOBYE message. It closes the open connection. + + This message does NOT receive response from server! + + ## Options + + See "Shared options" in the documentation of this module. + """ + @spec goodbye(atom(), port()) :: :ok | Boltex.Error.t() + def goodbye(transport, port) do + send_message(transport, port, {:goodbye, []}) + + Port.close(port) + + case Port.info(port) do + nil -> :ok + _ -> Boltex.Error.exception("Can't close port", port, :goodbye) + end + end + @doc false # Sends a message using the Bolt protocol and PackStream encoding. # diff --git a/lib/boltex/pack_stream/message.ex b/lib/boltex/pack_stream/message.ex index 0d5eb21..86a882b 100644 --- a/lib/boltex/pack_stream/message.ex +++ b/lib/boltex/pack_stream/message.ex @@ -15,6 +15,7 @@ defmodule Boltex.PackStream.Message do | :begin | :commit | :discard_all + | :goodbye | :hello | :init | :pull_all diff --git a/lib/boltex/pack_stream/message/encoder.ex b/lib/boltex/pack_stream/message/encoder.ex index 52206c7..d7329ba 100644 --- a/lib/boltex/pack_stream/message/encoder.ex +++ b/lib/boltex/pack_stream/message/encoder.ex @@ -18,6 +18,7 @@ defmodule Boltex.PackStream.Message.Encoder do @begin_signature 0x11 @commit_signature 0x12 @discard_all_signature 0x2F + @goodbye_signature 0x02 @hello_signature 0x01 @init_signature 0x01 @pull_all_signature 0x3F @@ -200,6 +201,7 @@ defmodule Boltex.PackStream.Message.Encoder do defp signature(:begin), do: @begin_signature defp signature(:commit), do: @commit_signature defp signature(:discard_all), do: @discard_all_signature + defp signature(:goodbye), do: @goodbye_signature defp signature(:hello), do: @hello_signature defp signature(:init), do: @init_signature defp signature(:pull_all), do: @pull_all_signature diff --git a/test/boltex/bolt_test.exs b/test/boltex/bolt_test.exs index c9e1472..46b30c8 100644 --- a/test/boltex/bolt_test.exs +++ b/test/boltex/bolt_test.exs @@ -101,6 +101,12 @@ defmodule Boltex.BoltTest do end end + test "GOOBYE exists only in v3+", %{port: port, bolt_version: bolt_version} do + if bolt_version >= 3 do + assert :ok = Bolt.goodbye(:gen_tcp, port) + end + end + test "Transactions work differently in v3", %{port: port, bolt_version: bolt_version} do test_transactions(port, bolt_version) end diff --git a/test/boltex/pack_stream/message/encoder_test.exs b/test/boltex/pack_stream/message/encoder_test.exs index 9d3fd8f..bafb1b9 100644 --- a/test/boltex/pack_stream/message/encoder_test.exs +++ b/test/boltex/pack_stream/message/encoder_test.exs @@ -27,6 +27,10 @@ defmodule Boltex.PackStream.Message.EncoderTest do assert <<0x0, 0x2, 0xB0, 0x2F, 0x0, 0x0>> = Encoder.encode({:discard_all, []}) end + test "goodbye" do + assert <<0x0, 0x2, 0xB0, 0x02, 0x0, 0x0>> = Encoder.encode({:goodbye, []}) + end + test "hello" do assert <<0x0, _, 0xB1, 0x1, _::binary>> = Encoder.encode({:hello, []}) diff --git a/test/boltex/pack_stream/message_test.exs b/test/boltex/pack_stream/message_test.exs index 7db5495..53aeae9 100644 --- a/test/boltex/pack_stream/message_test.exs +++ b/test/boltex/pack_stream/message_test.exs @@ -18,8 +18,10 @@ defmodule Boltex.PackStream.MessageTest do Message.encode({:run, ["RETURN {num} AS num", %{num: 5}, %{tx_timeou: 5000}]}) assert <<_::binary>> = Message.encode({:begin, []}) - assert <<_::binary>> = Message.encode({:commit, [%{tx_timeout: 5000}]}) + assert <<_::binary>> = Message.encode({:begin, [%{tx_timeout: 5000}]}) + assert <<_::binary>> = Message.encode({:commit, []}) assert <<_::binary>> = Message.encode({:rollback, []}) + assert <<_::binary>> = Message.encode({:goodbye, []}) end end From 31528a0b9689042f71dfdd83d51818bee4a459ec Mon Sep 17 00:00:00 2001 From: Dominique VASSARD Date: Mon, 28 Jan 2019 10:56:08 +0100 Subject: [PATCH 05/12] Fix tests --- lib/boltex.ex | 11 +++++++++-- test/boltex/pack_stream/message/encoder_test.exs | 5 +++-- test/boltex_test.exs | 5 ++++- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/lib/boltex.ex b/lib/boltex.ex index 77d9177..9f3fe56 100644 --- a/lib/boltex.ex +++ b/lib/boltex.ex @@ -23,8 +23,15 @@ defmodule Boltex do def test(host, port, query, params \\ %{}, auth \\ {}) do {:ok, p} = :gen_tcp.connect(host, port, active: false, mode: :binary, packet: :raw) - :ok = Bolt.handshake(:gen_tcp, p) - {:ok, _info} = Bolt.init(:gen_tcp, p, auth) + {:ok, version} = Bolt.handshake(:gen_tcp, p) + + case version do + 3 -> + {:ok, _info} = Bolt.hello(:gen_tcp, p, auth) + + _ -> + {:ok, _info} = Bolt.init(:gen_tcp, p, auth) + end Bolt.run_statement(:gen_tcp, p, query, params) end diff --git a/test/boltex/pack_stream/message/encoder_test.exs b/test/boltex/pack_stream/message/encoder_test.exs index bafb1b9..c70fabb 100644 --- a/test/boltex/pack_stream/message/encoder_test.exs +++ b/test/boltex/pack_stream/message/encoder_test.exs @@ -73,8 +73,9 @@ defmodule Boltex.PackStream.Message.EncoderTest do end defp result_run_with_params(bolt_version) when bolt_version <= 2 do - <<0x0, 0x13, 0xB2, 0x10, 0x8F, 0x52, 0x45, 0x54, 0x55, 0x52, 0x4E, 0x20, 0x31, 0x20, 0x41, - 0x53, 0x20, 0x6E, 0x75, 0x6D, 0xA0, 0x0, 0x0>> + <<0x0, 0x1D, 0xB2, 0x10, 0xD0, 0x13, 0x52, 0x45, 0x54, 0x55, 0x52, 0x4E, 0x20, 0x7B, 0x6E, + 0x75, 0x6D, 0x7D, 0x20, 0x41, 0x53, 0x20, 0x6E, 0x75, 0x6D, 0xA1, 0x83, 0x6E, 0x75, 0x6D, + 0x5, 0x0, 0x0>> end defp result_run_with_params(_bolt_version) do diff --git a/test/boltex_test.exs b/test/boltex_test.exs index a54fdfc..9e8855d 100644 --- a/test/boltex_test.exs +++ b/test/boltex_test.exs @@ -1,8 +1,11 @@ defmodule BoltexTest do use ExUnit.Case + alias Boltex.Bolt + alias Boltex.UriHelper + test "it works" do - uri = Boltex.IntegrationCase.neo4j_uri() + uri = UriHelper.get_info() Boltex.test(uri.host, uri.port, "RETURN 1 as num", %{}, uri.userinfo) end end From 4d8d42921cabe6f10a13eac2a202a9acd8537016 Mon Sep 17 00:00:00 2001 From: Dominique VASSARD Date: Mon, 28 Jan 2019 11:58:34 +0100 Subject: [PATCH 06/12] VersionAgent is now supervised to avoid error when non already started --- lib/boltex.ex | 24 ++++++++++++++++++- lib/boltex/bolt.ex | 2 +- lib/boltex/version_agent.ex | 16 +++++++++---- mix.exs | 4 ++-- .../pack_stream/message/encoder_test.exs | 14 ++++++----- 5 files changed, 46 insertions(+), 14 deletions(-) diff --git a/lib/boltex.ex b/lib/boltex.ex index 9f3fe56..ff5a017 100644 --- a/lib/boltex.ex +++ b/lib/boltex.ex @@ -3,9 +3,31 @@ defmodule Boltex do # # It supports de- and encoding of Boltex binaries and sending and receiving # of data using the Bolt protocol. - + use Application alias Boltex.Bolt + # def start_link(opts) do + # Supervisor.start_link(__MODULE__, opts, name: __MODULE__) + # end + + # @impl true + # def init(_args) do + # children = [ + # Boltex.VersionAgent + # ] + + # Supervisor.init(children, strategy: :one_for_one) + # end + + def start(_type, _args) do + children = [ + {Boltex.VersionAgent, []} + ] + + opts = [strategy: :one_for_one, name: __MODULE__] + Supervisor.start_link(children, opts) + end + @doc """ A simple function to test the library diff --git a/lib/boltex/bolt.ex b/lib/boltex/bolt.ex index b2cbf1d..3ca7a91 100644 --- a/lib/boltex/bolt.ex +++ b/lib/boltex/bolt.ex @@ -127,7 +127,7 @@ defmodule Boltex.Bolt do {:ok, <> = packet} when version <= @max_version -> Boltex.Logger.log_message(:server, :handshake, packet, :hex) Boltex.Logger.log_message(:server, :handshake, version) - Boltex.VersionAgent.start_link(version) + Boltex.VersionAgent.set(version) {:ok, version} {:ok, other} -> diff --git a/lib/boltex/version_agent.ex b/lib/boltex/version_agent.ex index 1b9af66..2bbc6c4 100644 --- a/lib/boltex/version_agent.ex +++ b/lib/boltex/version_agent.ex @@ -7,11 +7,11 @@ defmodule Boltex.VersionAgent do use Agent @doc """ - Start the agent with current protocol version + Start the agent with a default protocol version (version 1) """ - @spec start_link(integer()) :: {:error, any()} | {:ok, pid()} - def start_link(version) do - Agent.start_link(fn -> version end, name: __MODULE__) + @spec start_link(any()) :: {:error, any()} | {:ok, pid()} + def start_link(_) do + Agent.start_link(fn -> 1 end, name: __MODULE__) end @doc """ @@ -21,4 +21,12 @@ defmodule Boltex.VersionAgent do def get() do Agent.get(__MODULE__, & &1) end + + @doc """ + Set the current protocol version + """ + @spec set(integer()) :: :ok + def set(version) do + Agent.update(__MODULE__, fn _ -> version end) + end end diff --git a/mix.exs b/mix.exs index 8b54d39..2427f0e 100644 --- a/mix.exs +++ b/mix.exs @@ -26,8 +26,8 @@ defmodule Boltex.Mixfile do # Type "mix help compile.app" for more information def application do [ - # mod: {Boltex, []}, - applications: [:logger] + applications: [:logger], + mod: {Boltex, []} ] end diff --git a/test/boltex/pack_stream/message/encoder_test.exs b/test/boltex/pack_stream/message/encoder_test.exs index c70fabb..fe4368c 100644 --- a/test/boltex/pack_stream/message/encoder_test.exs +++ b/test/boltex/pack_stream/message/encoder_test.exs @@ -1,6 +1,5 @@ defmodule Boltex.PackStream.Message.EncoderTest do - # use ExUnit.Case, async: true - use Boltex.EncoderCase, async: true + use ExUnit.Case, async: true alias Boltex.PackStream.Message.Encoder defmodule TestUser do @@ -55,11 +54,14 @@ defmodule Boltex.PackStream.Message.EncoderTest do assert <<0x0, 0x2, 0xB0, 0x13, 0x0, 0x0>> = Encoder.encode({:rollback, []}) end - test "run", %{bolt_version: bolt_version} do - assert result_run_without_params(bolt_version) == Encoder.encode({:run, ["RETURN 1 AS num"]}) + test "run" do + for version <- [1..3] do + Boltex.VersionAgent.set(version) + assert result_run_without_params(version) == Encoder.encode({:run, ["RETURN 1 AS num"]}) - assert result_run_with_params(bolt_version) == - Encoder.encode({:run, ["RETURN {num} AS num", %{num: 5}]}) + assert result_run_with_params(version) == + Encoder.encode({:run, ["RETURN {num} AS num", %{num: 5}]}) + end end defp result_run_without_params(bolt_version) when bolt_version <= 2 do From cd062b672bfc7a900903d19446ba26eea01617b4 Mon Sep 17 00:00:00 2001 From: Dominique VASSARD Date: Mon, 28 Jan 2019 12:09:43 +0100 Subject: [PATCH 07/12] Generalize useage of %Boltex.Metadata{} --- lib/boltex/bolt.ex | 32 +++++++++++++++++++++-- lib/boltex/pack_stream/message/encoder.ex | 12 ++++++--- test/boltex_test.exs | 1 - 3 files changed, 39 insertions(+), 6 deletions(-) diff --git a/lib/boltex/bolt.ex b/lib/boltex/bolt.ex index 3ca7a91..908abba 100644 --- a/lib/boltex/bolt.ex +++ b/lib/boltex/bolt.ex @@ -269,14 +269,42 @@ defmodule Boltex.Bolt do do_run_statement(transport, port, data, options) end + @doc """ + RUN message that accepts metadata. + Available since Bolt v3. + + Runs a statement (most likely Cypher statement) and returns a list of the + records and a summary (Act as as a RUN + PULL_ALL). + + Records are represented using PackStream's record data type. Their Elixir + representation is a Keyword with the indexes `:sig` and `:fields`. + + ## Options + + See "Shared options" in the documentation of this module. + + ## Examples + + iex> Boltex.Bolt.run_statement("MATCH (n {uuid: {uuid}}) RETURN n", + %{uuid: 5}, %{bookmarks: ["neo4j:bookmark:v1:tx3234"]}) + [ + {:success, %{"fields" => ["n"]}}, + {:record, [sig: 1, fields: [1, "Example", "Labels", %{"some_attribute" => "some_value"}]]}, + {:success, %{"type" => "r"}} + ] + """ @spec run_statement_with_metadata(atom(), port(), String.t(), map(), map(), Keyword.t()) :: [ Boltex.PackStream.Message.decoded() ] | Boltex.Error.t() def run_statement_with_metadata(transport, port, statement, params, metadata, options \\ []) do - data = [statement, params, metadata] - do_run_statement(transport, port, data, options) + with {:ok, run_metadata} <- Boltex.Metadata.new(metadata) do + data = [statement, params, run_metadata] + do_run_statement(transport, port, data, options) + else + {:error, error} -> Boltex.Error.exception(error, port, :begin) + end end defp do_run_statement(transport, port, data, options) do diff --git a/lib/boltex/pack_stream/message/encoder.ex b/lib/boltex/pack_stream/message/encoder.ex index d7329ba..665d1fe 100644 --- a/lib/boltex/pack_stream/message/encoder.ex +++ b/lib/boltex/pack_stream/message/encoder.ex @@ -9,6 +9,8 @@ defmodule Boltex.PackStream.Message.Encoder do # - message_type: atom amongst the valid message type (:init, :discard_all, :pull_all, :ack_failure, :reset, :run) # - data: a list of data to be used by the message + alias Boltex.Metadata + @client_name "Boltex/0.5.0" @max_chunk_size 65_535 @@ -102,8 +104,8 @@ defmodule Boltex.PackStream.Message.Encoder do @doc """ Encode BEGIN message with metadata """ - def encode({:begin, [%Boltex.Metadata{} = metadata]}) do - encode({:begin, [Boltex.Metadata.to_map(metadata)]}) + def encode({:begin, [%Metadata{} = metadata]}) do + encode({:begin, [Metadata.to_map(metadata)]}) end @doc """ @@ -132,10 +134,14 @@ defmodule Boltex.PackStream.Message.Encoder do encode_run(message, Boltex.VersionAgent.get()) end - def encode({:run, [_statment, _params]} = message) do + def encode({:run, [_statement, _params]} = message) do encode_run(message, Boltex.VersionAgent.get()) end + def encode({:run, [statement, params, %Metadata{} = metadata]}) do + do_encode(:run, [statement, params, Metadata.to_map(metadata)]) + end + @doc """ Encode all messages without data: ACK_FAILURE, DISCARD_ALL, PULL_ALL, RESET diff --git a/test/boltex_test.exs b/test/boltex_test.exs index 9e8855d..afb8849 100644 --- a/test/boltex_test.exs +++ b/test/boltex_test.exs @@ -1,7 +1,6 @@ defmodule BoltexTest do use ExUnit.Case - alias Boltex.Bolt alias Boltex.UriHelper test "it works" do From bbf16233b3ffd41d07102931e24138dfab3adb32 Mon Sep 17 00:00:00 2001 From: Dominique VASSARD Date: Mon, 28 Jan 2019 12:11:43 +0100 Subject: [PATCH 08/12] Add neo4j 3.5 to test environments --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index ed809d5..6134a4a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,6 +5,7 @@ env: - NEO4J_VERSION=3.0 - NEO4J_VERSION=3.1 - NEO4J_VERSION=3.4 + - NEO4J_VERSION=3.5 elixir: - 1.3 - 1.4 From b20efe0ccd0ee1041db9d531fcdd2c904e416786 Mon Sep 17 00:00:00 2001 From: Dominique VASSARD Date: Mon, 28 Jan 2019 12:33:32 +0100 Subject: [PATCH 09/12] use Agent is not available in 1.3 nor 1.4 --- lib/boltex.ex | 4 +++- lib/boltex/version_agent.ex | 5 ++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/boltex.ex b/lib/boltex.ex index ff5a017..8e1bab0 100644 --- a/lib/boltex.ex +++ b/lib/boltex.ex @@ -20,8 +20,10 @@ defmodule Boltex do # end def start(_type, _args) do + import Supervisor.Spec + children = [ - {Boltex.VersionAgent, []} + worker(Boltex.VersionAgent, []) ] opts = [strategy: :one_for_one, name: __MODULE__] diff --git a/lib/boltex/version_agent.ex b/lib/boltex/version_agent.ex index 2bbc6c4..208237b 100644 --- a/lib/boltex/version_agent.ex +++ b/lib/boltex/version_agent.ex @@ -4,13 +4,12 @@ defmodule Boltex.VersionAgent do Boltex user shouldn't be forced to store and pass protocol version to function, therefore we store it here """ - use Agent @doc """ Start the agent with a default protocol version (version 1) """ - @spec start_link(any()) :: {:error, any()} | {:ok, pid()} - def start_link(_) do + @spec start_link() :: {:error, any()} | {:ok, pid()} + def start_link() do Agent.start_link(fn -> 1 end, name: __MODULE__) end From 02f7614d9de4da36dd6f60da43f759da74f459ef Mon Sep 17 00:00:00 2001 From: Dominique VASSARD Date: Mon, 28 Jan 2019 17:45:28 +0100 Subject: [PATCH 10/12] server does not respond when there is bookmarks defined in medatada (since neo4j 3.5.2) --- test/boltex/bolt_test.exs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/test/boltex/bolt_test.exs b/test/boltex/bolt_test.exs index 46b30c8..7ffb1a1 100644 --- a/test/boltex/bolt_test.exs +++ b/test/boltex/bolt_test.exs @@ -90,7 +90,7 @@ defmodule Boltex.BoltTest do metadata = %{ tx_timeout: 1000, - bookmarks: ["neo4j:bookmark:v1:tx16732"], + # bookmarks: ["neo4j:bookmark:v1:tx2"], Bookmark is causing timeout tx_metadata: %{ name: "my_tx" } @@ -138,13 +138,18 @@ defmodule Boltex.BoltTest do assert {:success, %{"bookmark" => _}} = Bolt.commit(:gen_tcp, port) # Transaction with metadata - assert :ok = Bolt.begin(:gen_tcp, port, %{bookmarks: ["neo4j:bookmark:v1:tx111"]}) + metadata = %{ + # bookmarks: ["neo4j:bookmark:v1:tx2"], Bookmark is causing timeout in neo4j 3.5.2 + tx_timeout: 1000 + } + + assert :ok = Bolt.begin(:gen_tcp, port, metadata) assert [{:success, _} | _] = Bolt.run_statement(:gen_tcp, port, "RETURN 1 as num") assert {:success, %{"bookmark" => _}} = Bolt.commit(:gen_tcp, port) # Rollback transaction - assert :ok = Bolt.begin(:gen_tcp, port, %{bookmarks: ["neo4j:bookmark:v1:tx111"]}) + assert :ok = Bolt.begin(:gen_tcp, port, metadata) assert [{:success, _} | _] = Bolt.run_statement(:gen_tcp, port, "RETURN 1 as num") assert :ok = Bolt.rollback(:gen_tcp, port) From d8f81b9d49da928b431eb9d360bd321dc9f9fdb1 Mon Sep 17 00:00:00 2001 From: Dominique VASSARD Date: Wed, 30 Jan 2019 14:05:29 +0100 Subject: [PATCH 11/12] Docs + bump version --- CHANGELOG.md | 4 ++ bolt_v3.md | 186 +++++++++++++++++++++++++++++++++++++++++++++++++++ mix.exs | 2 +- 3 files changed, 191 insertions(+), 1 deletion(-) create mode 100644 bolt_v3.md diff --git a/CHANGELOG.md b/CHANGELOG.md index ff6bb36..125ba3f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## v 0.6.0 +* Improvment + * Bolt V3 support + ## v0.5.1 * Fixes * A struct in a list is now well encoded diff --git a/bolt_v3.md b/bolt_v3.md new file mode 100644 index 0000000..45c381d --- /dev/null +++ b/bolt_v3.md @@ -0,0 +1,186 @@ +# BOLT V3 INFORMATION + +Since Neo4j 3.5, boltv3 is here! + +Nothing changes regarding types / structures, only messages are impacted. + +## Metadata +`Metadata` is a structure used as parameters by `RUN` and `BEGIN`. It is the same for both messages. +It purpose is not well defined yet. Needs research... + +Name | type | description +-----|------|------------ +bookmarks | list(string) | A list of bookmarks to be used +tx_timeout | int | the query timeout (milliseconds) +tx_metadata | map | ? + +## HELLO +This message replaces `INIT` and serves the same purpose. + +### Required parameters +Name | type | description +-----|------|------------ +user_agent | string | similar to _client_name_ used in `INIT` +scheme | string | | +credentials | string | | +principal | string | | + +#### Example +``HELLO %{credentials: "password", principal: "neo4j", scheme: "basic", user_agent: "Boltex/0.5.0"}`` + +### Return +`SUCCESS` +or +`FAILURE` with an error string + +When successfull, return contains the following data +Name | type | description +-----|------|------------ +server | string | The neo4j server version +connection_id | string || + +#### Example +``SUCCESS %{"connection_id" => "bolt-31", "server" => "Neo4j/3.5.1"}`` + +## RUN (Upated message) +`RUN` message still exists in bolt v1 and v2 and is used to send a query to the server. The server will respond with an acknowledglement and information about the result. Be aware that the result is not in the result. You have to send a `PULL_ALL` message to get the result. + +### Required parameters +Name | type | description +-----|------|------------ +_query | string | the query to be executed +_parameters | map | The parameters to be used by query | +_metadata | map | The metadata to be used (see metadata) | + +#### Examples +``"RETURN 1 as num" %{} %{}`` +``"MATCH (:Person {uid: {uid}} RETURN p as person" %{uid: 5} %{tx_timeout: 5000}`` + +### Return +`SUCCESS` with data +or +`FAILURE` with an error string + +When successfull, return contains the following data + +Name | type | description +-----|------|------------ +fields | list(string) | fields that will be found in result +t_first | int | When the result will be available (replaces `result_available_after`) + +#### Example +``SUCCESS %{"fields" => ["num"], "t_first" => 0}`` + +## PULL_ALL (Upated message) +`PULL_ALL` message still exists in bolt v1 and v2 is used to fetch result from a previous `RUN` message. + +### Required parameters +None + +#### Example +``PULL_ALL`` + +### Return +multiple `RECORD` messages and a final `SUCCESS` +or +`FAILURE` with an error string + +First, `PULL_ALL` will return `RECORD`s containing the query result. +When this stream is complete, it will return a `SUCCESS` message containing +Name | type | description +-----|------|------------ +bookmark | string | A bookmark name +stats | map | The stats summary (only for write operations) +t_last | int | time for result consumption (replaces `result_consumed_after`) +type | string | The operation type: **r**ead or **w**rite + +Examples: +``` +# Read +RECORD [1] +SUCCESS %{"bookmark" => "neo4j:bookmark:v1:tx16732", "t_last" => 0, "type" => "r"} + +# Write +SUCCESS ~ %{"bookmark" => "neo4j:bookmark:v1:tx16733", "stats" => %{"labels-added" => 1, "nodes-created" => 1, "properties-set" => 2}, "t_last" => 0, "type" => "w"} +``` + +## BEGIN +Signature: 0x11 + +`BEGIN` starts a transaction. + + +### Required parameters +Name | type | description +-----|------|------------ +_metadata | map | The metadata to be used (see metadata) | + +#### Example +``BEGIN %{}`` +``BEGIN %{bookmarks: ["neo4j:bookmark:v1:tx111"]}`` + +### Return +`SUCCESS` message without data +or +`FAILURE` with an error string + +#### Example +``SUCCESS %{}`` + +## COMMIT +Signature: 0x12 + +`COMMIT` commits the currently open transaction. + +### Required parameters +None + +#### Example +``COMMIT`` + +### Return +`SUCCESS` with data +or +`FAILURE` with an error string + +When successfull, return contains the following data: + +Name | type | description +-----|------|------------ +bookmark | string | A bookmark name + +#### Example +``SUCCESS %{"bookmark" => "neo4j:bookmark:v1:tx16732"}`` + +## ROLLBACK +Signature 0x13 + +`ROLLBACK` rollbacks the currently open transaction. + +### Required parameters +None + +#### Example +``ROLLBACK`` + +### Return +`SUCCESS` message without data +or +`FAILURE` with an error string + +#### Example +``SUCCESS %{}`` + +## GOODBYE +Signature: 0x02 + +`GOODBYE` closes the open connection + +### Required parameters +None + +#### Example +``GOODBYE`` + +### Return +Nothing because connection is closed. Server doesn't sent anything back! \ No newline at end of file diff --git a/mix.exs b/mix.exs index 2427f0e..548b99a 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Boltex.Mixfile do use Mix.Project - @version "0.5.1" + @version "0.6.0" def project do [ From 347d7006530d1c503ba86bde4006f86dedd1d648 Mon Sep 17 00:00:00 2001 From: Dominique VASSARD Date: Sat, 2 Feb 2019 13:57:09 +0100 Subject: [PATCH 12/12] Rebase + Add tests --- lib/boltex/bolt.ex | 12 +-- lib/boltex/pack_stream/message/encoder.ex | 6 +- test/boltex/bolt_test.exs | 79 ++++++++++++---- test/boltex/initialize_test.exs | 48 ++++++++++ test/boltex/metadata_test.exs | 19 ++++ .../pack_stream/message/encoder_test.exs | 90 ++++++++++++++++--- 6 files changed, 219 insertions(+), 35 deletions(-) create mode 100644 test/boltex/initialize_test.exs diff --git a/lib/boltex/bolt.ex b/lib/boltex/bolt.ex index 908abba..a276bc9 100644 --- a/lib/boltex/bolt.ex +++ b/lib/boltex/bolt.ex @@ -219,11 +219,11 @@ defmodule Boltex.Bolt do def goodbye(transport, port) do send_message(transport, port, {:goodbye, []}) - Port.close(port) - - case Port.info(port) do - nil -> :ok - _ -> Boltex.Error.exception("Can't close port", port, :goodbye) + try do + Port.close(port) + :ok + rescue + ArgumentError -> Boltex.Error.exception("Can't close port", port, :goodbye) end end @@ -303,7 +303,7 @@ defmodule Boltex.Bolt do data = [statement, params, run_metadata] do_run_statement(transport, port, data, options) else - {:error, error} -> Boltex.Error.exception(error, port, :begin) + {:error, error} -> Boltex.Error.exception(error, port, :run) end end diff --git a/lib/boltex/pack_stream/message/encoder.ex b/lib/boltex/pack_stream/message/encoder.ex index 665d1fe..52ce15f 100644 --- a/lib/boltex/pack_stream/message/encoder.ex +++ b/lib/boltex/pack_stream/message/encoder.ex @@ -139,7 +139,11 @@ defmodule Boltex.PackStream.Message.Encoder do end def encode({:run, [statement, params, %Metadata{} = metadata]}) do - do_encode(:run, [statement, params, Metadata.to_map(metadata)]) + if Boltex.VersionAgent.get() >= 3 do + do_encode(:run, [statement, params, Metadata.to_map(metadata)]) + else + do_encode(:run, [statement, params]) + end end @doc """ diff --git a/test/boltex/bolt_test.exs b/test/boltex/bolt_test.exs index 7ffb1a1..ec85b29 100644 --- a/test/boltex/bolt_test.exs +++ b/test/boltex/bolt_test.exs @@ -83,34 +83,77 @@ defmodule Boltex.BoltTest do end end - test "RUN with metadata (Bolt >= 3)", %{port: port, bolt_version: bolt_version} do - if bolt_version >= 3 do - assert [{:success, _}, {:record, _}, {:success, _}] = - Bolt.run_statement_with_metadata(:gen_tcp, port, "RETURN 1 AS num", %{}, %{}) - - metadata = %{ - tx_timeout: 1000, - # bookmarks: ["neo4j:bookmark:v1:tx2"], Bookmark is causing timeout - tx_metadata: %{ - name: "my_tx" - } + test "RUN with metadata (Bolt >= 3), in bolt < 3 metadata are stripped", %{ + port: port + } do + assert [{:success, _}, {:record, _}, {:success, _}] = + Bolt.run_statement_with_metadata(:gen_tcp, port, "RETURN 1 AS num", %{}, %{}) + + metadata = %{ + tx_timeout: 1000, + # bookmarks: ["neo4j:bookmark:v1:tx2"], Bookmark is causing timeout + tx_metadata: %{ + name: "my_tx" } + } - assert [{:success, _}, {:record, _}, {:success, _}] = - Bolt.run_statement_with_metadata(:gen_tcp, port, "RETURN 1 AS num", %{}, metadata) - end + assert [{:success, _}, {:record, _}, {:success, _}] = + Bolt.run_statement_with_metadata(:gen_tcp, port, "RETURN 1 AS num", %{}, metadata) end - test "GOOBYE exists only in v3+", %{port: port, bolt_version: bolt_version} do - if bolt_version >= 3 do - assert :ok = Bolt.goodbye(:gen_tcp, port) - end + test "RUN with metadata fails with erroneous metadata", %{port: port} do + invalid = %{ + tx_timeout: -5 + } + + assert %Boltex.Error{} = + Bolt.run_statement_with_metadata(:gen_tcp, port, "RETURN 1 AS num", %{}, invalid) + end + + test "GOOBYE exists only in v3+ but has no impact on other version", %{ + port: port + # bolt_version: bolt_version + } do + # if bolt_version >= 3 do + assert :ok = Bolt.goodbye(:gen_tcp, port) + # end + end + + test "GOODBYE fails if port is already closed", %{port: port} do + Port.close(port) + assert %Boltex.Error{} = Bolt.goodbye(:gen_tcp, port) end test "Transactions work differently in v3", %{port: port, bolt_version: bolt_version} do test_transactions(port, bolt_version) end + test "BEGIN fails in Bolt version < 3", %{port: port, bolt_version: bolt_version} do + if bolt_version < 3 do + assert %Boltex.Error{} = Bolt.begin(:gen_tcp, port) + end + end + + test "BEGIN fails if metadata are erroneous", %{port: port} do + invalid = %{ + tx_timeout: -5 + } + + assert %Boltex.Error{} = Bolt.begin(:gen_tcp, port, invalid) + end + + test "COMMIT fails in Bolt version < 3", %{port: port, bolt_version: bolt_version} do + if bolt_version < 3 do + assert %Boltex.Error{} = Bolt.commit(:gen_tcp, port) + end + end + + test "ROLLBACK fails in Bolt version < 3", %{port: port, bolt_version: bolt_version} do + if bolt_version < 3 do + assert %Boltex.Error{} = Bolt.rollback(:gen_tcp, port) + end + end + test "Temporal / spatial types does not work prior to bolt version 2", %{ port: port, diff --git a/test/boltex/initialize_test.exs b/test/boltex/initialize_test.exs new file mode 100644 index 0000000..3b84916 --- /dev/null +++ b/test/boltex/initialize_test.exs @@ -0,0 +1,48 @@ +defmodule Boltex.InitializeTest do + use ExUnit.Case + + alias Boltex.Bolt + alias Boltex.UriHelper + + test "HANDSHAKE return version on success" do + uri = UriHelper.get_info() + port_opts = [active: false, mode: :binary, packet: :raw] + {:ok, port} = :gen_tcp.connect(uri.host, uri.port, port_opts) + + assert {:ok, version} = Bolt.handshake(:gen_tcp, port) + + assert version in 1..3 + end + + test "HELLO works only in Bolt version >= 3" do + uri = UriHelper.get_info() + port_opts = [active: false, mode: :binary, packet: :raw] + {:ok, port} = :gen_tcp.connect(uri.host, uri.port, port_opts) + + {:ok, version} = Bolt.handshake(:gen_tcp, port) + + res = Bolt.hello(:gen_tcp, port, uri.userinfo) + + if version >= 3 do + assert {:ok, _} = res + else + assert {:error, _} = res + end + end + + test "INIT works only in Bolt version < 3" do + uri = UriHelper.get_info() + port_opts = [active: false, mode: :binary, packet: :raw] + {:ok, port} = :gen_tcp.connect(uri.host, uri.port, port_opts) + + {:ok, version} = Bolt.handshake(:gen_tcp, port) + + res = Bolt.init(:gen_tcp, port, uri.userinfo) + + if version >= 3 do + assert {:error, _} = res + else + assert {:ok, _} = res + end + end +end diff --git a/test/boltex/metadata_test.exs b/test/boltex/metadata_test.exs index d671eff..ab7243f 100644 --- a/test/boltex/metadata_test.exs +++ b/test/boltex/metadata_test.exs @@ -66,4 +66,23 @@ defmodule Boltex.MetadataTest do assert expected == result end end + + test "to_map remove nullified data" do + data = %{ + bookmarks: ["neo4j:bookmark:v1:tx1111"], + tx_timeout: 5000 + } + + assert {:ok, metadata} = Metadata.new(data) + + assert %Metadata{bookmarks: ["neo4j:bookmark:v1:tx1111"], metadata: nil, tx_timeout: 5000} == + metadata + + expected = %{ + bookmarks: ["neo4j:bookmark:v1:tx1111"], + tx_timeout: 5000 + } + + assert expected == Metadata.to_map(metadata) + end end diff --git a/test/boltex/pack_stream/message/encoder_test.exs b/test/boltex/pack_stream/message/encoder_test.exs index fe4368c..7441ee6 100644 --- a/test/boltex/pack_stream/message/encoder_test.exs +++ b/test/boltex/pack_stream/message/encoder_test.exs @@ -1,6 +1,7 @@ defmodule Boltex.PackStream.Message.EncoderTest do - use ExUnit.Case, async: true + use Boltex.IntegrationCase alias Boltex.PackStream.Message.Encoder + alias Boltex.Metadata defmodule TestUser do defstruct name: "", bolt_sips: true @@ -55,7 +56,7 @@ defmodule Boltex.PackStream.Message.EncoderTest do end test "run" do - for version <- [1..3] do + for version <- 1..3 do Boltex.VersionAgent.set(version) assert result_run_without_params(version) == Encoder.encode({:run, ["RETURN 1 AS num"]}) @@ -64,6 +65,30 @@ defmodule Boltex.PackStream.Message.EncoderTest do end end + test "run with metadata" do + {:ok, metadata} = Metadata.new(%{tx_timeout: 5000}) + + for version <- 1..3 do + Boltex.VersionAgent.set(version) + + assert result_run_with_metadata_without_params(version) == + Encoder.encode({:run, ["RETURN 1 AS num", %{}, metadata]}) + + assert result_run_with_metadata_with_params(version) == + Encoder.encode({:run, ["RETURN {num} AS num", %{num: 5}, metadata]}) + end + end + + test "bug fix: run struct" do + query = "CREATE (n:User {props})" + params = %{props: %TestUser{bolt_sips: true, name: "Strut"}} + + for version <- 1..3 do + Boltex.VersionAgent.set(version) + assert result_struct(version) == Encoder.encode({:run, [query, params]}) + end + end + defp result_run_without_params(bolt_version) when bolt_version <= 2 do <<0x0, 0x13, 0xB2, 0x10, 0x8F, 0x52, 0x45, 0x54, 0x55, 0x52, 0x4E, 0x20, 0x31, 0x20, 0x41, 0x53, 0x20, 0x6E, 0x75, 0x6D, 0xA0, 0x0, 0x0>> @@ -86,14 +111,59 @@ defmodule Boltex.PackStream.Message.EncoderTest do 0x5, 0xA0, 0x0, 0x0>> end - test "bug fix: encoding struct fails" do - query = "CREATE (n:User {props})" - params = %{props: %TestUser{bolt_sips: true, name: "Strut"}} + defp result_run_with_metadata_without_params(bolt_version) when bolt_version <= 2 do + <<0x0, 0x13, 0xB2, 0x10, 0x8F, 0x52, 0x45, 0x54, 0x55, 0x52, 0x4E, 0x20, 0x31, 0x20, 0x41, + 0x53, 0x20, 0x6E, 0x75, 0x6D, 0xA0, 0x0, 0x0>> + end + + defp result_run_with_metadata_without_params(_bolt_version) do + <<0x0, 0x22, 0xB3, 0x10, 0x8F, 0x52, 0x45, 0x54, 0x55, 0x52, 0x4E, 0x20, 0x31, 0x20, 0x41, + 0x53, 0x20, 0x6E, 0x75, 0x6D, 0xA0, 0xA1, 0x8A, 0x74, 0x78, 0x5F, 0x74, 0x69, 0x6D, 0x65, + 0x6F, 0x75, 0x74, 0xC9, 0x13, 0x88, 0x0, 0x0>> + end + + defp result_run_with_metadata_with_params(bolt_version) when bolt_version <= 2 do + <<0x0, 0x1D, 0xB2, 0x10, 0xD0, 0x13, 0x52, 0x45, 0x54, 0x55, 0x52, 0x4E, 0x20, 0x7B, 0x6E, + 0x75, 0x6D, 0x7D, 0x20, 0x41, 0x53, 0x20, 0x6E, 0x75, 0x6D, 0xA1, 0x83, 0x6E, 0x75, 0x6D, + 0x5, 0x0, 0x0>> + end + + defp result_run_with_metadata_with_params(_bolt_version) do + <<0x0, 0x2C, 0xB3, 0x10, 0xD0, 0x13, 0x52, 0x45, 0x54, 0x55, 0x52, 0x4E, 0x20, 0x7B, 0x6E, + 0x75, 0x6D, 0x7D, 0x20, 0x41, 0x53, 0x20, 0x6E, 0x75, 0x6D, 0xA1, 0x83, 0x6E, 0x75, 0x6D, + 0x5, 0xA1, 0x8A, 0x74, 0x78, 0x5F, 0x74, 0x69, 0x6D, 0x65, 0x6F, 0x75, 0x74, 0xC9, 0x13, + 0x88, 0x0, 0x0>> + end + + # test "bug fix: encoding struct fails" do + # query = "CREATE (n:User {props})" + # params = %{props: %TestUser{bolt_sips: true, name: "Strut"}} + + # res = Encoder.encode({:run, [query, params]}) + + # IO.puts(inspect(res, base: :hex, limit: :infinity)) + + # assert result_struc() = Encoder.encode({:run, [query, params]}) + + # # assert <<0x0, 0x39, 0xB2, 0x10, 0xD0, 0x17, 0x43, 0x52, 0x45, 0x41, 0x54, 0x45, 0x20, 0x28, + # # 0x6E, 0x3A, 0x55, 0x73, 0x65, 0x72, 0x20, 0x7B, 0x70, 0x72, 0x6F, 0x70, 0x73, 0x7D, + # # 0x29, 0xA1, 0x85, 0x70, 0x72, 0x6F, 0x70, 0x73, 0xA2, 0x89, 0x62, 0x6F, 0x6C, 0x74, + # # 0x5F, 0x73, 0x69, 0x70, 0x73, 0xC3, 0x84, _::binary>> = res + # end + + defp result_struct(bolt_version) when bolt_version <= 2 do + <<0x0, 0x39, 0xB2, 0x10, 0xD0, 0x17, 0x43, 0x52, 0x45, 0x41, 0x54, 0x45, 0x20, 0x28, 0x6E, + 0x3A, 0x55, 0x73, 0x65, 0x72, 0x20, 0x7B, 0x70, 0x72, 0x6F, 0x70, 0x73, 0x7D, 0x29, 0xA1, + 0x85, 0x70, 0x72, 0x6F, 0x70, 0x73, 0xA2, 0x89, 0x62, 0x6F, 0x6C, 0x74, 0x5F, 0x73, 0x69, + 0x70, 0x73, 0xC3, 0x84, 0x6E, 0x61, 0x6D, 0x65, 0x85, 0x53, 0x74, 0x72, 0x75, 0x74, 0x0, + 0x0>> + end - assert <<0x0, 0x39, 0xB2, 0x10, 0xD0, 0x17, 0x43, 0x52, 0x45, 0x41, 0x54, 0x45, 0x20, 0x28, - 0x6E, 0x3A, 0x55, 0x73, 0x65, 0x72, 0x20, 0x7B, 0x70, 0x72, 0x6F, 0x70, 0x73, 0x7D, - 0x29, 0xA1, 0x85, 0x70, 0x72, 0x6F, 0x70, 0x73, 0xA2, 0x89, 0x62, 0x6F, 0x6C, 0x74, - 0x5F, 0x73, 0x69, 0x70, 0x73, 0xC3, 0x84, - _::binary>> = Encoder.encode({:run, [query, params]}) + defp result_struct(_) do + <<0x0, 0x3A, 0xB3, 0x10, 0xD0, 0x17, 0x43, 0x52, 0x45, 0x41, 0x54, 0x45, 0x20, 0x28, 0x6E, + 0x3A, 0x55, 0x73, 0x65, 0x72, 0x20, 0x7B, 0x70, 0x72, 0x6F, 0x70, 0x73, 0x7D, 0x29, 0xA1, + 0x85, 0x70, 0x72, 0x6F, 0x70, 0x73, 0xA2, 0x89, 0x62, 0x6F, 0x6C, 0x74, 0x5F, 0x73, 0x69, + 0x70, 0x73, 0xC3, 0x84, 0x6E, 0x61, 0x6D, 0x65, 0x85, 0x53, 0x74, 0x72, 0x75, 0x74, 0xA0, + 0x0, 0x0>> end end