diff --git a/scripts/build-preloaded-store.escript b/scripts/build-preloaded-store.escript index 83a0ccca2..62ba90a08 100755 --- a/scripts/build-preloaded-store.escript +++ b/scripts/build-preloaded-store.escript @@ -90,6 +90,8 @@ hb_opts_compile_opts(Ebin) -> end. drop_outdir([{outdir, _} | Rest]) -> drop_outdir(Rest); +drop_outdir([{d, Name, Value} | Rest]) when is_list(Name) -> + [{d, list_to_atom(Name), Value} | drop_outdir(Rest)]; drop_outdir([Opt | Rest]) -> [Opt | drop_outdir(Rest)]; drop_outdir([]) -> []. diff --git a/scripts/hyper-token.lua b/scripts/hyper-token.lua index 237542a0a..233421724 100644 --- a/scripts/hyper-token.lua +++ b/scripts/hyper-token.lua @@ -119,11 +119,25 @@ function count_common(a, b) if type(a) ~= "table" then a = { a } end if type(b) ~= "table" then b = { b } end + -- local count = 0 + -- for _, v in ipairs(a) do + -- for _, w in ipairs(b) do + -- if v == w then + -- count = count + 1 + -- end + -- end + -- end + + local seen = {} local count = 0 for _, v in ipairs(a) do - for _, w in ipairs(b) do - if v == w then - count = count + 1 + if not seen[v] then + seen[v] = true + for _, w in ipairs(b) do + if v == w then + count = count + 1 + break + end end end end @@ -895,4 +909,4 @@ function compute(base, assignment) ao.event({ "Process initialized.", { slot = assignment.slot } }) return "ok", base end -end \ No newline at end of file +end diff --git a/src/core/http/hb_client_remote.erl b/src/core/http/hb_client_remote.erl index 05c7e5df5..7489d32e1 100644 --- a/src/core/http/hb_client_remote.erl +++ b/src/core/http/hb_client_remote.erl @@ -98,7 +98,13 @@ upload(Msg, Opts) -> end, hb_message:commitment_devices(Msg, Opts) ), - {ok, UploadResults}. + case lists:filter(fun upload_failed/1, UploadResults) of + [] -> {ok, UploadResults}; + Errors -> {error, Errors} + end. +upload_failed({error, _}) -> true; +upload_failed({failure, _}) -> true; +upload_failed(_) -> false. upload(Msg, Opts, <<"httpsig@1.0">>) -> case hb_opts:get(bundler_httpsig, not_found, Opts) of not_found -> @@ -107,13 +113,17 @@ upload(Msg, Opts, <<"httpsig@1.0">>) -> ?event({uploading_item, Msg}), hb_http:post(Bundler, <<"/tx">>, Msg, Opts) end; -upload(Msg, Opts, _CommitmentDevice) -> +upload(Msg, Opts, CommitmentDevice) -> ?event({uploading_item, Msg}), hb_ao:raw( <<"arweave@2.9">>, <<"tx">>, - #{}, - Msg#{ <<"method">> => <<"POST">> }, + Msg, + #{ + <<"method">> => <<"POST">>, + <<"target">> => <<"base">>, + <<"commitment-device">> => CommitmentDevice + }, Opts ). diff --git a/src/core/http/hb_http.erl b/src/core/http/hb_http.erl index 241d5e63a..f912169ff 100644 --- a/src/core/http/hb_http.erl +++ b/src/core/http/hb_http.erl @@ -638,9 +638,15 @@ encode_reply(Status, TABMReq, Message, Opts) -> end, Opts ), + DefaultAcceptBundle = + case {Codec, hb_maps:get(<<"require-codec">>, TABMReq, not_found, Opts)} of + {<<"json@1.0">>, not_found} -> false; + {<<"json@1.0">>, _} -> true; + _ -> false + end, AcceptBundle = hb_util:atom( - hb_maps:get(<<"accept-bundle">>, TABMReq, false, Opts) + hb_maps:get(<<"accept-bundle">>, TABMReq, DefaultAcceptBundle, Opts) ), ?event(debug_http, {encoding_reply, @@ -760,6 +766,7 @@ encode_reply(Status, TABMReq, Message, Opts) -> andalso not is_list(V) andalso Key =/= <<"body">> andalso Key =/= <<"data">> + andalso Key =/= <<"content-length">> end, Message, Opts @@ -1054,6 +1061,16 @@ normalize_unsigned(PrimMsg, Req = #{ headers := RawHeaders }, Msg, Opts) -> ), FilterKeys = hb_opts:get(http_inbound_filter_keys, ?DEFAULT_FILTER_KEYS, Opts), FilteredMsg = hb_message:without_unless_signed(FilterKeys, Msg, Opts), + DefaultAcceptBundle = + case maps:get( + <<"require-codec">>, + Msg, + maps:get(<<"require-codec">>, PrimMsg, not_found) + ) of + <<"application/json">> -> true; + <<"json@1.0">> -> true; + _ -> maps:get(<<"accept-bundle">>, RawHeaders, false) + end, BaseMsg = FilteredMsg#{ <<"method">> => Method, @@ -1065,7 +1082,7 @@ normalize_unsigned(PrimMsg, Req = #{ headers := RawHeaders }, Msg, Opts) -> maps:get( <<"accept-bundle">>, PrimMsg, - maps:get(<<"accept-bundle">>, RawHeaders, false) + DefaultAcceptBundle ) ), <<"accept">> => @@ -1284,6 +1301,21 @@ cors_get_test() -> hb_ao:get(<<"access-control-allow-origin">>, Res, LocalOpts) ). +content_length_not_forwarded_for_encoded_reply_test() -> + {200, Headers, EncodedBody} = + encode_reply( + 200, + #{ <<"require-codec">> => <<"application/json">> }, + #{ + <<"content-length">> => <<"999">>, + <<"status">> => 200, + <<"body">> => <<"ok">> + }, + #{} + ), + ?assertNot(maps:is_key(<<"content-length">>, Headers)), + ?assert(byte_size(EncodedBody) > 0). + ans104_wasm_test() -> ServerStore = [hb_test_utils:test_store()], ServerOpts = diff --git a/src/core/resolver/hb_cache_control.erl b/src/core/resolver/hb_cache_control.erl index 85746bf0d..665bcade1 100644 --- a/src/core/resolver/hb_cache_control.erl +++ b/src/core/resolver/hb_cache_control.erl @@ -56,7 +56,14 @@ lookup(Base, Req, Opts) -> Opts, hb_opts:get(store_scope_resolved, local, Opts) ), - case hb_cache:read_resolved(Base, Req, OutputScopedOpts) of + CacheRead = + try hb_cache:read_resolved(Base, Req, OutputScopedOpts) of + ReadRes -> ReadRes + catch + throw:{necessary_message_not_found, _, _} -> + miss + end, + case CacheRead of {hit, not_found} -> {error, not_found}; {hit, {ok, Res}} -> diff --git a/src/core/resolver/hb_opts.erl b/src/core/resolver/hb_opts.erl index 4d89c1dda..5c9bf763f 100644 --- a/src/core/resolver/hb_opts.erl +++ b/src/core/resolver/hb_opts.erl @@ -531,7 +531,7 @@ raw_default_message() -> } ] }, - <<"scheduler-default-commitment-spec">> => <<"httpsig@1.0">>, + <<"scheduler-default-commitment-spec">> => <<"ans104@1.0">>, <<"genesis-wasm-import-authorities">> => [ <<"WjnS-s03HWsDSdMnyTdzB1eHZB2QheUWP_FVRVYxkXk">> diff --git a/src/preloaded/arweave/dev_arweave.erl b/src/preloaded/arweave/dev_arweave.erl index 88ca6523d..de3be7f5d 100644 --- a/src/preloaded/arweave/dev_arweave.erl +++ b/src/preloaded/arweave/dev_arweave.erl @@ -47,11 +47,16 @@ tx(Base, Request, Opts) -> %% you should use the ~bundler@1.0 device. post_tx(Base, RawRequest, Opts) -> {ok, Request} = extract_target(Base, RawRequest, Opts), - case hb_maps:find(<<"commitment-device">>, Request, Opts) of + case hb_maps:find(<<"commitment-device">>, RawRequest, Opts) of {ok, Device} -> post_tx(Base, Request, Opts, Device); error -> - post_tx_detect_device(Base, Request, Opts) + case hb_maps:find(<<"commitment-device">>, Request, Opts) of + {ok, Device} -> + post_tx(Base, Request, Opts, Device); + error -> + post_tx_detect_device(Base, Request, Opts) + end end. %% @doc Detect the commitment device to use when posting a transaction. diff --git a/src/preloaded/codec/dev_json.erl b/src/preloaded/codec/dev_json.erl index dffff0820..807dade03 100644 --- a/src/preloaded/codec/dev_json.erl +++ b/src/preloaded/codec/dev_json.erl @@ -27,9 +27,10 @@ to(Msg, Req, Opts) -> tabm, ConvOpts ), + Bundle = hb_maps:get(<<"bundle">>, Req, false, Opts), Loaded = - case hb_maps:get(<<"bundle">>, Req, false, Opts) of - true -> hb_cache:ensure_all_loaded(Restructured, Opts); + case Bundle of + true -> load_available_links(hb_link:decode_all_links(Restructured), Opts); false -> Restructured end, JSONStructured = @@ -38,12 +39,37 @@ to(Msg, Req, Opts) -> tabm, #{ <<"device">> => <<"structured@1.0">>, + <<"bundle">> => Bundle, <<"encode-types">> => [<<"atom">>] }, ConvOpts ), {ok, hb_json:encode(JSONStructured)}. +%% @doc Eager-load resolvable links for bundled JSON responses, while leaving +%% missing lazy links in place so optional response fields do not fail encoding. +load_available_links(Msg, Opts) -> + load_available_links([], Msg, Opts). + +load_available_links(_Ref, Link, Opts) when ?IS_LINK(Link) -> + try hb_cache:ensure_loaded(Link, Opts) of + Loaded -> load_available_links([], Loaded, Opts) + catch + throw:{necessary_message_not_found, _, _} -> Link + end; +load_available_links(Ref, Msg, Opts) when is_map(Msg) -> + maps:map( + fun(K, V) -> load_available_links([K|Ref], V, Opts) end, + Msg + ); +load_available_links(Ref, Msg, Opts) when is_list(Msg) -> + lists:map( + fun({N, V}) -> load_available_links([N|Ref], V, Opts) end, + hb_util:number(Msg) + ); +load_available_links(_Ref, Msg, _Opts) -> + Msg. + %% @doc Decode a JSON string to a message. from(Map, _Req, _Opts) when is_map(Map) -> {ok, Map}; from(JSON, Req, Opts) -> diff --git a/src/preloaded/codec/dev_json_iface.erl b/src/preloaded/codec/dev_json_iface.erl index b37621d6c..b4b35b6e1 100644 --- a/src/preloaded/codec/dev_json_iface.erl +++ b/src/preloaded/codec/dev_json_iface.erl @@ -208,7 +208,10 @@ prepare_tags(Msg, Opts) -> {ok, OriginalTags} -> Res = hb_util:message_to_ordered_list(OriginalTags), ?event({using_original_tags, Res}), - Res; + case complete_tags(Res) of + true -> Res; + false -> prepare_header_case_tags(Msg, Opts) + end; error -> prepare_header_case_tags(Msg, Opts) end; @@ -216,6 +219,14 @@ prepare_tags(Msg, Opts) -> prepare_header_case_tags(Msg, Opts) end. +complete_tags(Tags) -> + lists:all( + fun(#{ <<"name">> := _, <<"value">> := _ }) -> true; + (_) -> false + end, + Tags + ). + %% @doc Convert a message without an `original-tags' field into a list of %% key-value pairs, with the keys in HTTP header-case. prepare_header_case_tags(TABM, Opts) -> diff --git a/src/preloaded/codec/lib_arweave_common.erl b/src/preloaded/codec/lib_arweave_common.erl index aa0951aea..8a4fd8a90 100644 --- a/src/preloaded/codec/lib_arweave_common.erl +++ b/src/preloaded/codec/lib_arweave_common.erl @@ -535,7 +535,9 @@ original_tags_to_tags(TagMap) -> ?event({ordered_tagmap, {explicit, OrderedList}, {input, {explicit, TagMap}}}), lists:map( fun(#{ <<"name">> := Key, <<"value">> := Value }) -> - {Key, Value} + {Key, Value}; + (#{ <<"name">> := Key }) -> + {Key, <<>>} end, OrderedList ). diff --git a/src/preloaded/node/dev_router.erl b/src/preloaded/node/dev_router.erl index aa332e860..47ee979ea 100644 --- a/src/preloaded/node/dev_router.erl +++ b/src/preloaded/node/dev_router.erl @@ -286,12 +286,19 @@ route(_, Msg, Opts) -> ), Nodes ), - Chosen = choose(ChooseN, Strategy, Msg, Nodes, Opts), + RouteMsg = + route_selection_message( + Msg, + RouteWithAppliedNodes, + Opts + ), + Chosen = + choose(ChooseN, Strategy, RouteMsg, Nodes, Opts), ?event({choose, {strategy, Strategy}, {choose_n, ChooseN}, {nodes, Nodes}, - {msg, Msg}, + {msg, RouteMsg}, {chosen, Chosen} }), case Chosen of @@ -355,6 +362,61 @@ apply_routes(Msg, R, Opts) -> ?event({nodes_after_apply, NodesWithRouteApplied}), R#{ <<"nodes">> => NodesWithRouteApplied }. +%% @doc Apply a matched route's configured routing key to a request message. +route_selection_message(Msg, Route, Opts) -> + case hb_maps:get(<<"route-by">>, Route, not_found, Opts) of + not_found -> + Msg; + Spec -> + case route_by(Spec, Msg, Opts) of + not_found -> Msg; + RouteBy -> Msg#{ <<"route-by">> => RouteBy } + end + end. + +route_by(Spec, Msg, Opts) when is_integer(Spec) -> + route_by_prefix(Spec, Msg, Opts); +route_by(Spec, Msg, Opts) when is_binary(Spec) -> + case safe_to_integer(Spec) of + {ok, Int} -> route_by_prefix(Int, Msg, Opts); + error -> route_by_regex(Spec, Msg, Opts) + end; +route_by(_, _, _) -> + not_found. + +route_by_prefix(N, Msg, Opts) when N > 0 -> + case route_path(Msg, Opts) of + not_found -> not_found; + Path -> + PathWithoutSlash = strip_leading_slash(Path), + case byte_size(PathWithoutSlash) >= N of + true -> binary:part(PathWithoutSlash, 0, N); + false -> not_found + end + end; +route_by_prefix(_, _, _) -> + not_found. + +route_by_regex(Regex, Msg, Opts) -> + case route_path(Msg, Opts) of + not_found -> + not_found; + Path -> + case re:run(Path, Regex, [{capture, [1], binary}]) of + {match, [RouteBy]} -> RouteBy; + _ -> not_found + end + end. + +route_path(Msg, Opts) -> + case hb_util:find_target_path(Msg, Opts) of + no_path -> not_found; + {_TargetKey, Path} -> hb_path:normalize(hb_path:to_binary(Path)) + end. + +strip_leading_slash(<<"/", Rest/binary>>) -> Rest; +strip_leading_slash(Path) -> Path. + %% @doc Apply a node map's rules for transforming the path of the message. %% Supports the following keys: %% - `opts': A map of options to pass to the request. @@ -570,6 +632,9 @@ choose(N, <<"Range">>, #{ <<"route-by">> := RouteBy }, Nodes, Opts) -> Nodes ), lists:sublist(FilteredNodes, min(length(FilteredNodes), N)); +choose(N, <<"Nearest">>, #{ <<"route-by">> := HashPath }, Nodes, Opts) + when is_binary(HashPath) -> + choose(N, <<"Nearest">>, normalize_hashpath(HashPath), Nodes, Opts); choose(N, <<"Nearest">>, #{ <<"path">> := HashPath }, Nodes, Opts) when is_binary(HashPath) -> choose(N, <<"Nearest">>, normalize_hashpath(HashPath), Nodes, Opts); @@ -727,11 +792,11 @@ binary_to_bignum(Bin) when ?IS_ID(Bin) -> %% @doc Preprocess a request to check if it should be relayed to a different node. preprocess(Base, RawReq, Opts) -> Req = hb_ao:get(<<"request">>, RawReq, Opts#{ <<"hashpath">> => ignore }), - ?event(debug_preprocess, {called_preprocess,Req}), + % ?event(debug_preprocess1, {called_preprocess,Req}), TemplateRoutes = load_routes(Opts), - ?event(debug_preprocess, {template_routes, TemplateRoutes}), - Res = hb_http:message_to_request(Req, Opts), - ?event(debug_preprocess, {match, Res}), + % ?event(debug_preprocess1, {template_routes, TemplateRoutes}), + Res = route_request(Req, RawReq, Opts), + % ?event(debug_preprocess1, {match, Res}), case Res of {error, _} -> ?event(debug_preprocess, preprocessor_did_not_match), @@ -757,7 +822,7 @@ preprocess(Base, RawReq, Opts) -> }] }} end; - {ok, _Method, Node, _Path, _MsgWithoutMeta, _ReqOpts} -> + {ok, _Method, Node, _RoutedPath, _MsgWithoutMeta, _ReqOpts} -> ?event(debug_preprocess, {matched_route, {explicit, Res}}), CommitRequest = hb_util:atom( @@ -783,20 +848,13 @@ preprocess(Base, RawReq, Opts) -> % We additionally ensure that the request itself has a commitment, % such that headers added by the relaying node are not added to the % user's request. - UserReqWithCommit = - case hb_message:signers(Req, Opts) of - [] -> - hb_message:commit( - Req, - Opts, - #{ - <<"commitment-device">> => <<"httpsig@1.0">>, - <<"type">> => <<"unsigned">> - } - ); - _ -> - Req - end, + RoutedReq = with_routed_from(Req, RawReq, Opts), + LoadedReq = with_loaded_routed_from( + hb_cache:ensure_all_loaded(RoutedReq, Opts), + RoutedReq, + Opts + ), + UserReqWithCommit = commit_routed_request(LoadedReq, Opts), UserPath = case hb_maps:get(<<"path">>, Req, not_found, Opts) of P when is_binary(P), byte_size(P) > 0 -> @@ -806,7 +864,7 @@ preprocess(Base, RawReq, Opts) -> _ -> throw({error, invalid_user_path}) end, - RelayReq = + RelayReqBase = #{ <<"device">> => <<"apply@1.0">>, <<"path">> => <<"user-path">>, @@ -814,6 +872,11 @@ preprocess(Base, RawReq, Opts) -> <<"user-path">> => UserPath, <<"user-message">> => UserReqWithCommit }, + RelayReq = + case hb_maps:get(<<"routed-from">>, RoutedReq, not_found, Opts) of + not_found -> RelayReqBase; + Router -> RelayReqBase#{ <<"routed-from">> => Router } + end, ?event(debug_preprocess, {prepared_relay_req, RelayReq}), { ok, @@ -836,8 +899,314 @@ preprocess(Base, RawReq, Opts) -> } end. +%% @doc Add the router that relayed the request, preserving an existing value +%% if this request is already moving through a routed chain. +with_routed_from(Req, RawReq, Opts) -> + case hb_maps:get(<<"routed-from">>, Req, not_found, Opts) of + not_found -> + case routed_from(RawReq, Opts) of + not_found -> Req; + Router -> Req#{ <<"routed-from">> => Router } + end; + _ -> + Req + end. + +with_loaded_routed_body(Req, Opts) -> + case hb_maps:get(<<"routed-from">>, Req, not_found, Opts) of + not_found -> Req; + Router -> with_routed_body(Req, Router, Opts) + end. + +with_loaded_routed_from(LoadedReq, RoutedReq, Opts) -> + case hb_maps:get(<<"routed-from">>, RoutedReq, not_found, Opts) of + not_found -> + LoadedReq; + Router -> + with_loaded_routed_body( + with_routed_message(LoadedReq, Router, Opts), + Opts + ) + end. + +with_routed_message(Req, Router, Opts) -> + case hb_ao:get(<<"type">>, Req, not_found, Opts#{ <<"hashpath">> => ignore }) of + <<"Process">> -> + Req; + _ -> + case hb_maps:get(<<"routed-from">>, Req, not_found, Opts) of + not_found -> Req#{ <<"routed-from">> => Router }; + _ -> Req + end + end. + +with_routed_body(Req, Router, Opts) -> + case hb_maps:get(<<"body">>, Req, not_found, Opts#{ <<"hashpath">> => ignore }) of + Body when is_map(Body) -> + case hb_ao:get(<<"type">>, Body, not_found, Opts#{ <<"hashpath">> => ignore }) of + <<"Process">> -> + Req; + _ -> + RoutedBody = + case hb_maps:get(<<"routed-from">>, Body, not_found, Opts) of + not_found -> Body#{ <<"routed-from">> => Router }; + _ -> Body + end, + CommittedBody = commit_routed_request(RoutedBody, Opts), + {ok, _} = hb_cache:write(CommittedBody, Opts), + Req#{ <<"body">> => CommittedBody } + end; + _ -> + Req + end. + +commit_routed_request(Req, Opts) -> + case hb_message:signers(Req, Opts) of + [] -> + hb_message:commit( + Req, + Opts, + #{ + <<"commitment-device">> => <<"httpsig@1.0">>, + <<"type">> => <<"unsigned">> + } + ); + _ -> Req + end. + +routed_from(RawReq, Opts) -> + case hb_ao:resolve( + #{ <<"device">> => <<"whois@1.0">> }, + #{ <<"path">> => <<"node">> }, + Opts + ) of + {ok, Host} when is_binary(Host), Host =/= <<"unknown">> -> + normalize_router_host(Host, Opts); + _ -> + configured_router_host(RawReq, Opts) + end. + +configured_router_host(RawReq, Opts) -> + case hb_opts:get(host, not_found, Opts) of + not_found -> request_router_host(RawReq, Opts); + Host -> normalize_router_host(Host, Opts) + end. + +request_router_host(RawReq, Opts) -> + Req = hb_ao:get(<<"request">>, RawReq, #{}, Opts#{ <<"hashpath">> => ignore }), + case hb_maps:get(<<"host">>, Req, not_found, Opts) of + not_found -> not_found; + Host -> normalize_router_host(Host, Opts) + end. + +normalize_router_host(<<"http://", _/binary>> = Host, _Opts) -> Host; +normalize_router_host(<<"https://", _/binary>> = Host, _Opts) -> Host; +normalize_router_host(Host, Opts) -> + case binary:match(Host, <<":">>) of + nomatch -> + Port = hb_opts:get(port, not_found, Opts), + case Port of + not_found -> <<"http://", Host/binary>>; + _ -> <<"http://", Host/binary, ":", (hb_util:bin(Port))/binary>> + end; + _ -> + <<"http://", Host/binary>> + end. + +%% @doc Find the HTTP route for a request, using a process ID hint for an +%% initial `/push' if the literal path does not match any route. +route_request(Req, RawReq, Opts) -> + case hb_http:message_to_request(Req, Opts) of + Error = {error, {no_viable_route, _, _}} -> + case initial_push_route_request(Req, RawReq, Opts) of + not_found -> + Error; + RouteReq -> + ?event(debug_preprocess, {initial_push_route_req, RouteReq}), + hb_http:message_to_request(RouteReq, Opts) + end; + Res -> + Res + end. + +%% @doc Add a route-only process path for initial pushes with a process body. +initial_push_route_request(Req, RawReq, Opts) -> + case initial_push_route_path(Req, RawReq, Opts) of + not_found -> not_found; + RoutePath -> Req#{ <<"route-path">> => RoutePath } + end. + +initial_push_route_path(Req, RawReq, Opts) -> + Path = hb_ao:get(<<"path">>, Req, <<>>, Opts#{ <<"hashpath">> => ignore }), + case {is_push_path(Path), initial_push_process_id(Req, RawReq, Opts)} of + {true, ProcID} when ?IS_ID(ProcID) -> + <<"/", ProcID/binary, "/push">>; + _ -> + not_found + end. + +%% @doc Determine if a path is an initial process push. +is_push_path(Path) -> + case hb_path:normalize(hb_path:to_binary(Path)) of + <<"/push">> -> true; + <<"/push?", _/binary>> -> true; + _ -> false + end. + +%% @doc Find the pushed process ID from the raw or parsed hook request. +initial_push_process_id(Req, RawReq, Opts) -> + HookBody = hb_ao:get( + <<"body">>, + RawReq, + [], + Opts#{ <<"hashpath">> => ignore } + ), + ReqBody = hb_ao:get( + <<"body">>, + Req, + not_found, + Opts#{ <<"hashpath">> => ignore } + ), + initial_push_process_id( + [Req, ReqBody | route_body_candidates(HookBody, Opts)], + Opts + ). +initial_push_process_id([], _Opts) -> + not_found; +initial_push_process_id([Candidate | Rest], Opts) -> + case process_id_from_candidate(Candidate, Opts) of + not_found -> initial_push_process_id(Rest, Opts); + ProcID -> ProcID + end. + +%% @doc Normalize a hook body into route hint candidates. +route_body_candidates(Body, _Opts) when is_list(Body) -> Body; +route_body_candidates(Body, Opts) when is_map(Body) -> + case hb_util:is_ordered_list(Body, Opts) of + true -> hb_util:message_to_ordered_list(Body, Opts); + false -> [Body] + end; +route_body_candidates(_, _Opts) -> []. + +%% @doc Return a signed process ID from a candidate message, if present. +process_id_from_candidate(Candidate, Opts) when is_map(Candidate) -> + Process = + case hb_ao:get(<<"body/type">>, Candidate, not_found, Opts) of + <<"Process">> -> + hb_ao:get( + <<"body">>, + Candidate, + not_found, + Opts#{ <<"hashpath">> => ignore } + ); + _ -> + case hb_ao:get(<<"type">>, Candidate, not_found, Opts) of + <<"Process">> -> Candidate; + _ -> not_found + end + end, + case Process of + Msg when is_map(Msg) -> + try + {ok, Committed} = hb_message:with_only_committed(Msg, Opts), + hb_message:id(Committed, signed, Opts) + catch + _:_ -> not_found + end; + _ -> + not_found + end; +process_id_from_candidate(_, _Opts) -> + not_found. + %%% Tests +initial_push_route_request_test() -> + Peer = <<"http://localhost:8735">>, + ClientOpts = #{ <<"priv-wallet">> => ar_wallet:new() }, + Process = + hb_message:commit( + #{ + <<"device">> => <<"process@1.0">>, + <<"execution-device">> => <<"test-device@1.0">>, + <<"push-device">> => <<"push@1.0">>, + <<"scheduler-device">> => <<"scheduler@1.0">>, + <<"type">> => <<"Process">>, + <<"module">> => <<"test-module">> + }, + ClientOpts + ), + ProcID = hb_message:id(Process, signed, ClientOpts), + Req = Process#{ <<"path">> => <<"/push">>, <<"method">> => <<"POST">> }, + RawReq = #{ <<"request">> => Req, <<"body">> => [Process, Req] }, + NumberedRawReq = + #{ + <<"request">> => Req#{ <<"body">> => not_found }, + <<"body">> => hb_util:list_to_numbered_message([Process, Req]) + }, + RoutePath = <<"/", ProcID/binary, "/push">>, + Opts = + #{ + <<"priv-wallet">> => ar_wallet:new(), + <<"store">> => hb_test_utils:test_store(), + <<"node-host">> => <<"router.test:8734">>, + <<"routes">> => + [ + #{ + <<"template">> => <<"^/.{43}(~.*|/).*">>, + <<"node">> => #{ <<"prefix">> => Peer } + } + ] + }, + ?assertMatch({error, _}, hb_http:message_to_request(Req, Opts)), + ?assertMatch( + {ok, <<"POST">>, Peer, RoutePath, _, _}, + route_request(Req, RawReq, Opts) + ), + ?assertMatch( + {ok, <<"POST">>, Peer, RoutePath, _, _}, + route_request(maps:remove(<<"body">>, Req), NumberedRawReq, Opts) + ), + {ok, #{ <<"body">> := [RelayBase, RelayCall] }} = + preprocess(#{}, RawReq, Opts), + ?assertEqual(Peer, hb_maps:get(<<"peer">>, RelayBase, not_found, Opts)), + ProxyMsg = hb_maps:get(<<"proxy-message">>, RelayCall, #{}, Opts), + UserMsg = hb_maps:get(<<"user-message">>, ProxyMsg, #{}, Opts), + ?assertEqual(not_found, hb_maps:get(<<"route-path">>, UserMsg, not_found, Opts)), + ?assertEqual( + <<"http://router.test:8734">>, + hb_maps:get(<<"routed-from">>, UserMsg, not_found, Opts) + ), + ?assertEqual( + <<"/push">>, + hb_maps:get(<<"user-path">>, ProxyMsg, not_found, Opts) + ), + Msg = + #{ + <<"target">> => ProcID, + <<"type">> => <<"Message">>, + <<"data">> => <<"test-message">> + }, + MsgReq = + #{ + <<"path">> => RoutePath, + <<"method">> => <<"POST">>, + <<"body">> => Msg + }, + {ok, #{ <<"body">> := [_MsgRelayBase, MsgRelayCall] }} = + preprocess(#{}, #{ <<"request">> => MsgReq }, Opts), + MsgProxyMsg = hb_maps:get(<<"proxy-message">>, MsgRelayCall, #{}, Opts), + MsgUserMsg = hb_maps:get(<<"user-message">>, MsgProxyMsg, #{}, Opts), + ?assertEqual( + <<"http://router.test:8734">>, + hb_maps:get(<<"routed-from">>, MsgUserMsg, not_found, Opts) + ), + RoutedBody = hb_maps:get(<<"body">>, MsgUserMsg, #{}, Opts), + ?assertEqual( + <<"http://router.test:8734">>, + hb_maps:get(<<"routed-from">>, RoutedBody, not_found, Opts) + ). + test_provider_test_parallel_() -> {timeout, 30, fun test_provider/0}. test_provider() -> @@ -1991,6 +2360,101 @@ route_nearest_integer_preserves_opts_test_parallel() -> SelectedURIs ). +route_nearest_route_by_process_test() -> + ProcID = hb_util:encode(crypto:strong_rand_bytes(32)), + PushPath = <<"/", ProcID/binary, "~process@1.0/push">>, + ComputePath = <<"/", ProcID/binary, "~process@1.0/compute=3">>, + BaseRoute = + #{ + <<"template">> => <<"^/.{43}(~.*|/).*">>, + <<"strategy">> => <<"Nearest">>, + <<"nodes">> => + [ + #{ + <<"prefix">> => <<"http://localhost:8001">>, + <<"salt">> => <<"local1">>, + <<"wallet">> => hb_util:human_id(ar_wallet:to_address(ar_wallet:new())) + }, + #{ + <<"prefix">> => <<"http://localhost:8002">>, + <<"salt">> => <<"local2">>, + <<"wallet">> => hb_util:human_id(ar_wallet:to_address(ar_wallet:new())) + }, + #{ + <<"prefix">> => <<"http://localhost:8003">>, + <<"salt">> => <<"local3">>, + <<"wallet">> => hb_util:human_id(ar_wallet:to_address(ar_wallet:new())) + } + ] + }, + lists:foreach( + fun(RouteBy) -> + Routes = [BaseRoute#{ <<"route-by">> => RouteBy }], + {ok, PushRoute} = route(#{ <<"path">> => PushPath }, #{ <<"routes">> => Routes }), + {ok, ComputeRoute} = + route(#{ <<"path">> => ComputePath }, #{ <<"routes">> => Routes }), + ?assertEqual( + hb_maps:get(<<"prefix">>, PushRoute, #{}), + hb_maps:get(<<"prefix">>, ComputeRoute, #{}) + ), + ?assertEqual( + <<(hb_maps:get(<<"prefix">>, PushRoute, #{}))/binary, PushPath/binary>>, + hb_maps:get(<<"uri">>, PushRoute, #{}) + ), + ?assertEqual( + <<(hb_maps:get(<<"prefix">>, ComputeRoute, #{}))/binary, ComputePath/binary>>, + hb_maps:get(<<"uri">>, ComputeRoute, #{}) + ) + end, + [43, <<"^/([^/~]{43})">>] + ). + +route_nearest_route_by_process_same_wallet_test() -> + Wallet = <<"bFUk2NRWuoHgxL4tzJQM5gw09snq66uM_fiPJPyQKP0">>, + BaseRoute = + #{ + <<"template">> => <<"^/.{43}(~.*|/).*">>, + <<"strategy">> => <<"Nearest">>, + <<"nodes">> => + [ + #{ + <<"prefix">> => <<"http://localhost:8001">>, + <<"salt">> => <<"local1">>, + <<"wallet">> => Wallet + }, + #{ + <<"prefix">> => <<"http://localhost:8002">>, + <<"salt">> => <<"local2">>, + <<"wallet">> => Wallet + }, + #{ + <<"prefix">> => <<"http://localhost:8003">>, + <<"salt">> => <<"local3">>, + <<"wallet">> => Wallet + } + ] + }, + lists:foreach( + fun(RouteBy) -> + Routes = [BaseRoute#{ <<"route-by">> => RouteBy }], + Prefixes = + lists:usort( + [ + begin + ProcID = hb_util:encode(crypto:hash(sha256, <>)), + Path = <<"/", ProcID/binary, "~process@1.0/push">>, + {ok, Route} = route(#{ <<"path">> => Path }, #{ <<"routes">> => Routes }), + hb_maps:get(<<"prefix">>, Route, #{}) + end + || + N <- lists:seq(1, 30) + ] + ), + ?assert(length(Prefixes) > 1) + end, + [43, <<"^/([^/~]{43})">>] + ). + route_multirequest_parallel_limit_test_parallel_() -> {timeout, 30, fun route_multirequest_parallel_limit/0}. route_multirequest_parallel_limit() -> diff --git a/src/preloaded/process/dev_process.erl b/src/preloaded/process/dev_process.erl index 55eea8feb..bec4a2426 100644 --- a/src/preloaded/process/dev_process.erl +++ b/src/preloaded/process/dev_process.erl @@ -217,7 +217,7 @@ compute(Base, Req, Opts) -> {result, Result} } ), - {ok, without_snapshot(Result, Opts)}; + {ok, compute_response(Result, Req, Opts)}; {error, not_found} -> {ok, Loaded} = ensure_loaded(ProcBase, Req, Opts), ?event(compute, @@ -259,7 +259,7 @@ compute_to_slot(ProcID, Base, Req, TargetSlot, Opts) -> Opts ), store_result(true, ProcID, TargetSlot, Base, Req, Opts), - {ok, without_snapshot(lib_process:as_process(Base, Opts), Opts)}; + {ok, compute_response(lib_process:as_process(Base, Opts), Req, Opts)}; CurrentSlot when CurrentSlot < TargetSlot -> % Compute the next state transition. NextSlot = CurrentSlot + 1, @@ -646,7 +646,7 @@ now(RawBase, Req, Opts) -> LatestKnown = dev_process_cache:latest(ProcessID, [], Opts), case LatestKnown of {ok, LatestSlot, RawLatestMsg} -> - LatestMsg = without_snapshot(RawLatestMsg, Opts), + LatestMsg = compute_response(RawLatestMsg, Req, Opts), ?event(compute_cache, {serving_latest_cached_state, {proc_id, ProcessID}, @@ -791,3 +791,17 @@ ensure_loaded(Base, Req, Opts) -> %% @doc Remove the `snapshot' key from a message and return it. without_snapshot(Msg, Opts) -> hb_ao:set(Msg, <<"snapshot">>, unset, Opts). + +%% @doc Format a compute response for the caller with its result payload inline. +compute_response(Msg, _Req, Opts) -> + with_loaded_results(without_snapshot(Msg, Opts), Opts). + +with_loaded_results(Msg, Opts) -> + Decoded = hb_link:decode_all_links(Msg), + case hb_maps:get(<<"results">>, Decoded, not_found, Opts) of + not_found -> Msg; + Results -> + (maps:remove(<<"results+link">>, Msg))#{ + <<"results">> => hb_cache:ensure_all_loaded(Results, Opts) + } + end. diff --git a/src/preloaded/process/dev_push.erl b/src/preloaded/process/dev_push.erl index 8a549e16d..23983ab33 100644 --- a/src/preloaded/process/dev_push.erl +++ b/src/preloaded/process/dev_push.erl @@ -44,7 +44,7 @@ push(Base, Req, Opts) -> no_slot -> case schedule_initial_message(Process, Req, Opts) of {ok, Assignment} -> - case find_type(hb_ao:get(<<"body">>, Assignment, Opts), Opts) of + case find_type(Req, Opts) of <<"Process">> -> ?event(push, {initializing_process, @@ -197,36 +197,56 @@ do_push(PrimaryProcess, Assignment, Opts) -> <<"source">> => RawMsgToPush } end, + Origin = + with_routed_from( + #{ + <<"process">> => ID, + <<"slot">> => Slot, + <<"outbox-key">> => Key, + <<"result-depth">> => IncludeDepth, + <<"max-depth">> => MaxDepth, + <<"from-base">> => BaseID, + <<"from-uncommitted">> => UncommittedID, + <<"from-scheduler">> => + hb_ao:get( + <<"scheduler">>, + PrimaryProcess, + Opts + ), + <<"from-authority">> => + hb_ao:get( + <<"authority">>, + PrimaryProcess, + Opts + ) + }, + Assignment, + Opts + ), case hb_cache:read(Target, Opts) of {ok, DownstreamProcess} -> push_result_message( DownstreamProcess, MsgToPush, - #{ - <<"process">> => ID, - <<"slot">> => Slot, - <<"outbox-key">> => Key, - <<"result-depth">> => IncludeDepth, - <<"max-depth">> => MaxDepth, - <<"from-base">> => BaseID, - <<"from-uncommitted">> => UncommittedID, - <<"from-scheduler">> => - hb_ao:get( - <<"scheduler">>, - PrimaryProcess, - Opts - ), - <<"from-authority">> => - hb_ao:get( - <<"authority">>, - PrimaryProcess, - Opts - ) - }, + Origin, Opts ); {error, not_found} -> - target_process_not_found(Target) + ?event(push_short, + {target_process_not_found, + {target, Target}, + {outbox_key, Key}, + {process, ID}, + {slot, Slot} + }, + Opts + ), + push_routed_target_message( + Target, + MsgToPush, + Origin, + Opts + ) end; (Key, Msg) -> #{ @@ -261,6 +281,124 @@ target_process_not_found(Target) -> <<"reason">> => <<"Could not access target process!">> }. +with_routed_from(Origin, Assignment, Opts) -> + case routed_from_assignment(Assignment, Opts) of + not_found -> + Origin; + Router -> + add_routed_from(Origin, Router) + end. + +routed_from_assignment(Assignment, Opts) -> + case hb_maps:get(<<"routed-from">>, Assignment, not_found, Opts) of + not_found -> + case hb_maps:get(<<"body">>, Assignment, not_found, Opts) of + Body when is_map(Body) -> + case hb_maps:get(<<"routed-from">>, Body, not_found, Opts) of + not_found -> routed_from_assignment_ao(Assignment, Opts); + Router -> Router + end; + _ -> + routed_from_assignment_ao(Assignment, Opts) + end; + Router -> + Router + end. + +routed_from_assignment_ao(Assignment, Opts) -> + hb_ao:get_first( + [ + {Assignment, <<"routed-from">>}, + {Assignment, <<"body/routed-from">>} + ], + not_found, + Opts#{ <<"hashpath">> => ignore } + ). + +add_routed_from(Origin, not_found) -> Origin; +add_routed_from(Origin, Router) -> Origin#{ <<"routed-from">> => Router }. + +%% @doc Route an unscheduled downstream push to the node that owns the target. +push_routed_target_message(Target, MsgToPush, Origin, Opts) -> + Path = <<"/", Target/binary, "/push">>, + RouteReq = + #{ + <<"path">> => <<"route">>, + <<"route-path">> => Path + }, + ?event(push_route, + {downstream_push_route, + {stage, attempting_route}, + {mode, target_process}, + {target, Target}, + {path, Path}, + {process, maps:get(<<"process">>, Origin)}, + {slot, maps:get(<<"slot">>, Origin)}, + {outbox_key, maps:get(<<"outbox-key">>, Origin)} + }, + Opts + ), + case maps:get(<<"routed-from">>, Origin, not_found) of + Router when is_binary(Router) -> + post_routed_target_message(Router, Path, Target, MsgToPush, Origin, Opts); + not_found -> + case hb_ao:resolve(#{ <<"device">> => <<"router@1.0">> }, RouteReq, Opts) of + {ok, Node} -> + post_routed_target_message(Node, Path, Target, MsgToPush, Origin, Opts); + {error, no_matches} -> + target_process_not_found(Target); + {error, Error} -> + ?event(push, + {no_push_route_found, + {target, Target}, + {error, Error} + }, + Opts + ), + target_process_not_found(Target) + end + end. + +post_routed_target_message(Node, Path, Target, MsgToPush, Origin, Opts) -> + ?event(push, + {routed_target_process_push, + {target, Target}, + {node, Node} + }, + Opts + ), + SignedMsg = + hb_message:commit( + augment_message(Origin, MsgToPush, Opts), + Opts, + hb_opts:get( + scheduler_default_commitment_spec, + <<"httpsig@1.0">>, + Opts + ) + ), + {ok, _} = hb_cache:write(SignedMsg, Opts), + case hb_http:post( + Node, + Path, + hb_maps:without([<<"path">>], SignedMsg, Opts), + Opts + ) of + {ok, Res} -> + #{ + <<"id">> => hb_message:id(SignedMsg, all, Opts), + <<"target">> => Target, + <<"resulted-in">> => Res + }; + {error, Error} -> + ?event(push, {push_failed, {error, Error}}, Opts), + #{ + <<"response">> => <<"error">>, + <<"target">> => Target, + <<"reason">> => Error + } + end. + %% @doc If the outbox message has a path we interpret it as a request to perform %% AO-Core eval and schedule the result. Additionally, we remove the `target` @@ -410,12 +548,56 @@ push_downstream_remote(TargetID, NextSlotOnProc, Origin, RawOpts) -> {push_downstream_remote, {target, TargetID}, {slot, NextSlotOnProc}, + {path, Path}, {origin, Origin}, {opts, Opts} } ), + case maps:get(<<"routed-from">>, Origin, not_found) of + Router when is_binary(Router) -> + ?event(push, + {routing_via_routed_from, + {target, TargetID}, + {slot, NextSlotOnProc}, + {router, Router} + }, + Opts + ), + hb_http:post(Router, Path, Opts); + not_found -> + push_downstream_remote_via_local_route( + TargetID, + NextSlotOnProc, + Origin, + Path, + RouteReq, + Self, + Opts + ) + end. + +push_downstream_remote_via_local_route( + TargetID, + NextSlotOnProc, + Origin, + Path, + RouteReq, + Self, + Opts +) -> case hb_ao:resolve(#{ <<"device">> => <<"router@1.0">> }, RouteReq, Opts) of {error, no_matches} -> + ?event(push_route, + {downstream_push_route, + {stage, route_decided}, + {decision, local_no_route}, + {mode, downstream_slot}, + {target, TargetID}, + {slot, NextSlotOnProc}, + {path, Path} + }, + Opts + ), ?event(push, {no_push_route_found, {target, TargetID}, @@ -426,6 +608,18 @@ push_downstream_remote(TargetID, NextSlotOnProc, Origin, RawOpts) -> ), push_downstream_local(TargetID, NextSlotOnProc, Origin, Opts); {ok, Self} -> + ?event(push_route, + {downstream_push_route, + {stage, route_decided}, + {decision, local_self_route}, + {mode, downstream_slot}, + {target, TargetID}, + {slot, NextSlotOnProc}, + {path, Path}, + {node, Self} + }, + Opts + ), % If we matched ourselves as the route, we can just push locally. ?event(push, {routing_matched_self, @@ -437,6 +631,18 @@ push_downstream_remote(TargetID, NextSlotOnProc, Origin, RawOpts) -> ), push_downstream_local(TargetID, NextSlotOnProc, Origin, Opts); {ok, Node} -> + ?event(push_route, + {downstream_push_route, + {stage, route_decided}, + {decision, remote}, + {mode, downstream_slot}, + {target, TargetID}, + {slot, NextSlotOnProc}, + {path, Path}, + {node, Node} + }, + Opts + ), ?event(push, {routing_matched_remote, {target, TargetID}, @@ -459,12 +665,15 @@ push_downstream_local(TargetID, NextSlotOnProc, Origin, Opts) -> {origin, Origin} } ), + ResultDepth = + decrement_result_depth( + hb_maps:get(<<"result-depth">>, Origin, 1, Opts) + ), BaseReq = #{ <<"path">> => <<"push">>, <<"slot">> => NextSlotOnProc, - <<"result-depth">> => - hb_maps:get(<<"result-depth">>, Origin, 1, Opts) - 1 + <<"result-depth">> => ResultDepth }, Req = case parse_max_depth(hb_maps:get(<<"max-depth">>, Origin, undefined, Opts)) of @@ -492,6 +701,15 @@ parse_max_depth(Bin) when is_binary(Bin) -> end; parse_max_depth(_) -> undefined. +decrement_result_depth(Depth) when is_integer(Depth), Depth > 0 -> Depth - 1; +decrement_result_depth(Depth) when is_binary(Depth) -> + try hb_util:int(Depth) of + N -> decrement_result_depth(N) + catch + _:_ -> 0 + end; +decrement_result_depth(_) -> 0. + %% @doc Augment the message with from-* keys, if it doesn't already have them. normalize_message(MsgToPush, Opts) -> hb_ao:set( @@ -656,10 +874,15 @@ schedule_result(TargetProcess, MsgToPush, Codec, Origin, Opts) -> %% message came from. augment_message(Origin, ToSched, Opts) -> ?event(push, {adding_keys, {origin, Origin}, {to, ToSched}}, Opts), + RoutedFrom = + case maps:get(<<"routed-from">>, Origin, not_found) of + not_found -> #{}; + Router -> #{ <<"routed-from">> => Router } + end, hb_message:uncommitted( hb_ao:set( ToSched, - #{ + RoutedFrom#{ <<"data-protocol">> => <<"ao">>, <<"variant">> => <<"ao.N.1">>, <<"type">> => <<"Message">>, @@ -778,10 +1001,17 @@ schedule_initial_message(Base, Req, Opts) -> case hb_ao:resolve(Base, ModReq, Opts) of {ok, Res} -> case hb_ao:get(<<"status">>, Res, 200, Opts) of - 200 -> {ok, Res}; + 200 -> {ok, add_routed_from(Res, routed_from_assignment(Req, Opts))}; 307 -> Location = hb_ao:get(<<"location">>, Res, Opts), - remote_schedule_result(Location, Req, Opts) + case remote_schedule_result(Location, Req, Opts) of + {ok, RemoteRes} -> + {ok, add_routed_from( + RemoteRes, + routed_from_assignment(Req, Opts) + )}; + Error -> Error + end end; {error, Res = #{ <<"status">> := 422 }} -> ?event(push, {initial_push_wrong_format, {error, Res}}, Opts), @@ -990,7 +1220,7 @@ test_push_as_identity() -> test_multi_process_push() -> {Sender, _Receiver, MsgSlot, Opts} = setup_two_process_message(), - %% Install a catch-all `Pong' handler on the Sender so the Receiver's + %% Install a `Pong' handler on the Sender so the Receiver's %% reply (the helper's `reply_script' fires on `Action = "Ping"' and %% sends back `Action = "Reply"') is observable as `GOT PONG' in the %% Sender's `now/results/data'. @@ -999,7 +1229,9 @@ test_multi_process_push() -> Sender, << "Handlers.add(\"Pong\",\n" - " function (test) return true end,\n" + " function (test)\n" + " return (test.Action or test.action) == \"Reply\"\n" + " end,\n" " function(m)\n" " print(\"GOT PONG\")\n" " end\n" @@ -1191,7 +1423,7 @@ test_remote_routed_push() -> <<"routes">> => [ #{ - <<"template">> => <>, + <<"template">> => <<"/", Proc2ID/binary, ".*">>, <<"node">> => N2 } ] @@ -1202,7 +1434,7 @@ test_remote_routed_push() -> {ok, N2}, hb_http:get( N1, - <<"/~router@1.0/route?route-path=", Proc2ID/binary, "/push&slot=1">>, + <<"/~router@1.0/route?route-path=/", Proc2ID/binary, "/push&slot=1">>, N1Opts ) ), @@ -1210,11 +1442,10 @@ test_remote_routed_push() -> Proc1 = hb_process_test_vectors:aos_process(N1Opts), LoadedProc1 = hb_cache:ensure_all_loaded(Proc1, N1Opts), Proc1ID = hb_message:id(LoadedProc1, all, N1Opts), - % Write both processes to each of the nodes' caches, such that both are - % 'globally' available to each other. + % Write each process to its owner node. The source node must route pushes + % to remote targets when pushing cross-node outbox messages. hb_cache:write(LoadedProc1, N1Opts), hb_cache:write(LoadedProc1, N2Opts), - hb_cache:write(LoadedProc2, N1Opts), hb_cache:write(LoadedProc2, N2Opts), ?event(debug_test, {network_setup, @@ -1585,9 +1816,11 @@ test_nested_push_prompts_encoding_change() -> ping_pong_script(Limit) -> << "Handlers.add(\"Ping\",\n" - " function (test) return true end,\n" + " function (test)\n" + " return (test.Action or test.action) == \"Ping\"\n" + " end,\n" " function(m)\n" - " C = tonumber(m.Count)\n" + " C = tonumber(m.Count or m.count)\n" " if C <= ", (integer_to_binary(Limit))/binary, " then\n" " Send({ Target = ao.id, Action = \"Ping\", Count = C + 1 })\n" " print(\"Ping\", C + 1)\n" @@ -1603,11 +1836,14 @@ reply_script() -> << """ Handlers.add("Reply", - { Action = "Ping" }, function(m) + return (m.Action or m.action) == "Ping" + end, + function(m) + local from = m.From or m.from print("Replying to...") - print(m.From) - Send({ Target = m.From, Action = "Reply", Message = "Pong!" }) + print(from) + Send({ Target = from, Action = "Reply", Message = "Pong!" }) print("Done.") end ) diff --git a/src/preloaded/process/dev_scheduler.erl b/src/preloaded/process/dev_scheduler.erl index fe8dff675..b75ea63d3 100644 --- a/src/preloaded/process/dev_scheduler.erl +++ b/src/preloaded/process/dev_scheduler.erl @@ -15,7 +15,7 @@ %%% -module(dev_scheduler). --device_libraries([lib_process]). +-device_libraries([lib_process, lib_scheduler_formats]). %%% AO-Core API functions: -export([info/0]). %%% Local scheduling functions: @@ -377,17 +377,34 @@ post_schedule(Base, Req, Opts) -> ?event(scheduling_message), % Find the target message to schedule: RawToSched = find_message_to_schedule(Base, Req, Opts), - % If the message can not be properly loaded, this will throw an error - % before scheduling the message. - try hb_cache:ensure_all_loaded(RawToSched, Opts) of - ToSched -> - do_post_schedule(Base, Req, ToSched, Opts) - catch - error:{necessary_message_not_found, _, _} -> + % Filter before loading so uncommitted HTTP wrapper links do not block a + % valid signed message from being scheduled. + case hb_message:with_only_committed(RawToSched, Opts) of + {ok, OnlyCommitted} -> + try hb_cache:ensure_all_loaded(OnlyCommitted, Opts) of + Loaded -> + do_post_schedule( + Base, + Req, + with_routed_from(Loaded, [RawToSched, Req], Opts), + Opts + ) + catch + _: {necessary_message_not_found, _, _} -> + {error, + #{ + <<"status">> => 404, + <<"body">> => <<"Cannot fully load message to schedule.">> + } + } + end; + {error, Err} -> {error, #{ - <<"status">> => 404, - <<"body">> => <<"Cannot fully load message to schedule.">> + <<"status">> => 400, + <<"body">> => <<"Message invalid: ", + "Committed components cannot be validated.">>, + <<"reason">> => Err } } end. @@ -408,25 +425,26 @@ do_post_schedule(Base, Req, ToSched, Opts) -> {message, ToSched} } ), + RoutedCommitted = with_routed_from(OnlyCommitted, [ToSched, Req], Opts), % Find the relevant scheduler server for the given process and % message, start a new one if necessary, or return a redirect to the % correct remote scheduler. case find_server(ProcID, Base, ToSched, Opts) of {local, PID} -> ?event({scheduling_locally, {proc_id, ProcID}, {pid, PID}}), - post_local_schedule(ProcID, PID, OnlyCommitted, Opts); + post_local_schedule(ProcID, PID, RoutedCommitted, Opts); {redirect, Redirect} -> ?event({process_is_remote, {redirect, Redirect}}), case hb_opts:get(scheduler_follow_redirects, true, Opts) of true -> ?event({proxying_to_remote_scheduler, {redirect, Redirect}, - {msg, OnlyCommitted} + {msg, RoutedCommitted} }), post_remote_schedule( ProcID, Redirect, - OnlyCommitted, + RoutedCommitted, Opts ); false -> {ok, Redirect} @@ -446,6 +464,21 @@ do_post_schedule(Base, Req, ToSched, Opts) -> } end. +with_routed_from(Committed, RawMessages, Opts) when is_list(RawMessages) -> + case hb_ao:get_first( + [{Raw, <<"routed-from">>} || Raw <- RawMessages], + not_found, + Opts#{ <<"hashpath">> => ignore } + ) of + not_found -> Committed; + Router -> Committed#{ <<"routed-from">> => Router } + end; +with_routed_from(Committed, Raw, Opts) -> + case hb_ao:get(<<"routed-from">>, Raw, not_found, Opts#{ <<"hashpath">> => ignore }) of + not_found -> Committed; + Router -> Committed#{ <<"routed-from">> => Router } + end. + %% @doc Post schedule the message. `Req' by this point has been refined to only %% committed keys, and to only include the `target' message that is to be %% scheduled. @@ -482,14 +515,6 @@ post_local_schedule(ProcID, PID, Req, Opts) -> }; {true, <<"Process">>} -> {ok, _} = hb_cache:write(Req, Opts), - spawn( - fun() -> - {ok, Results} = hb_client_remote:upload(Req, Opts), - ?event( - {uploaded_process, {proc_id, ProcID}, {results, Results}} - ) - end - ), ?event( {registering_new_process, {proc_id, ProcID}, @@ -776,7 +801,7 @@ remote_slot(<<"ao.TN.1">>, ProcID, Node, Opts) -> % Convert the JSON object for the latest assignment into the % standardized `~scheduler@1.0' format. A = - dev_scheduler_formats:aos2_to_assignment( + lib_scheduler_formats:aos2_to_assignment( JSON, Opts ), @@ -845,7 +870,7 @@ get_schedule(Base, Req, Opts) -> {ok, Res} -> case uri_string:percent_decode(Format) of <<"application/aos-2">> -> - dev_scheduler_formats:assignments_to_aos2( + lib_scheduler_formats:assignments_to_aos2( ProcID, hb_ao:get( <<"assignments">>, Res, [], Opts), @@ -896,7 +921,7 @@ do_get_remote_schedule(ProcID, LocalAssignments, From, To, _, Opts) % as a bundle. We set the 'more' to `undefined' to indicate that there may % be more assignments to fetch, but we don't know for sure. Res = - dev_scheduler_formats:assignments_to_bundle( + lib_scheduler_formats:assignments_to_bundle( ProcID, LocalAssignments, undefined, @@ -995,7 +1020,7 @@ do_get_remote_schedule(ProcID, LocalAssignments, From, To, Redirect, Opts) -> cache_remote_schedule(Variant, ProcID, JSONRes, Opts), ?event(debug_aos2, {json_res, {json, JSONRes}}), Filtered = filter_json_assignments(JSONRes, To, From, Opts), - dev_scheduler_formats:aos2_to_assignments( + lib_scheduler_formats:aos2_to_assignments( ProcID, Filtered, Opts @@ -1018,7 +1043,7 @@ do_get_remote_schedule(ProcID, LocalAssignments, From, To, Redirect, Opts) -> % Merge the local assignments with the remote assignments, % and normalize the keys. Merged = - dev_scheduler_formats:assignments_to_bundle( + lib_scheduler_formats:assignments_to_bundle( ProcID, MergedAssignments = LocalAssignments ++ RemoteAssignments, hb_ao:get(<<"continues">>, NormSched, false, Opts), @@ -1272,7 +1297,7 @@ post_legacy_schedule(ProcID, OnlyCommitted, Node, Opts) -> ), ?event({assignment_json, AssignmentJSON}), Assignment = - dev_scheduler_formats:aos2_to_assignment( + lib_scheduler_formats:aos2_to_assignment( AssignmentJSON, Opts ), @@ -1386,9 +1411,9 @@ generate_local_schedule(Format, ProcID, From, To, Opts) -> FormatterFun = case uri_string:percent_decode(Format) of <<"application/aos-2">> -> - fun dev_scheduler_formats:assignments_to_aos2/4; + fun lib_scheduler_formats:assignments_to_aos2/4; _ -> - fun dev_scheduler_formats:assignments_to_bundle/4 + fun lib_scheduler_formats:assignments_to_bundle/4 end, Res = FormatterFun(ProcID, Assignments, More, Opts), ?event({assignments_bundle_outbound, {format, Format}, {res, Res}}), diff --git a/src/preloaded/process/dev_scheduler_cache.erl b/src/preloaded/process/dev_scheduler_cache.erl index cd25d7aad..f8139dc76 100644 --- a/src/preloaded/process/dev_scheduler_cache.erl +++ b/src/preloaded/process/dev_scheduler_cache.erl @@ -91,7 +91,7 @@ read(ProcID, Slot, RawOpts) -> case hb_ao:get(<<"variant">>, Assignment, Opts) of <<"ao.TN.1">> -> Loaded = hb_cache:ensure_all_loaded(Assignment, Opts), - Norm = dev_scheduler_formats:aos2_to_assignment(Loaded, Opts), + Norm = lib_scheduler_formats:aos2_to_assignment(Loaded, Opts), ?event({normalized_aos2_assignment, Norm}), {ok, Norm}; <<"ao.N.1">> -> diff --git a/src/preloaded/process/dev_scheduler_server.erl b/src/preloaded/process/dev_scheduler_server.erl index 3a4f4ea95..03aa22dae 100644 --- a/src/preloaded/process/dev_scheduler_server.erl +++ b/src/preloaded/process/dev_scheduler_server.erl @@ -212,9 +212,10 @@ do_assign(State, Message, ReplyPID) -> BaseStateHashpath = base_state(State), NextSlot = maps:get(current, State) + 1, {Timestamp, Height, Hash} = ar_timestamp:get(), + AssignmentMeta = routed_assignment_metadata(Message, Opts), Assignment = commit_assignment( - #{ + AssignmentMeta#{ <<"path">> => case hb_path:from_message(request, Message, Opts) of undefined -> <<"compute">>; @@ -263,17 +264,7 @@ do_assign(State, Message, ReplyPID) -> State ), ?event(writes_complete), - ?event(uploading_message), - hb_client_remote:upload(Message, Opts), - hb_client_remote:upload(Assignment, Opts), - ?event(uploads_complete), - maybe_inform_recipient( - remote_confirmation, - ReplyPID, - Message, - Assignment, - State - ) + dispatch_uploads(Message, Assignment, ReplyPID, State) end, case hb_opts:get(scheduling_mode, sync, Opts) of aggressive -> @@ -288,23 +279,40 @@ do_assign(State, Message, ReplyPID) -> base_state_hashpath := next_hashpath(BaseStateHashpath, Assignment, State) }. +routed_assignment_metadata(Message, Opts) -> + case hb_ao:get(<<"routed-from">>, Message, not_found, Opts#{ <<"hashpath">> => ignore }) of + not_found -> #{}; + Router -> #{ <<"routed-from">> => Router } + end. + %% @doc Commit to the assignment using all of our appropriate wallets. commit_assignment(BaseAssignment, State) -> Wallets = maps:get(wallets, State), Opts = maps:get(opts, State), CommittmentSpec = maps:get(committment_spec, State), - lists:foldr( - fun(Wallet, Assignment) -> - hb_message:commit( - Assignment, - Opts#{ <<"priv-wallet">> => Wallet }, - CommittmentSpec - ) + lists:foldl( + fun(Wallet, Acc) -> + Signed = + hb_message:commit( + BaseAssignment, + Opts#{ <<"priv-wallet">> => Wallet }, + CommittmentSpec + ), + merge_commitments(Acc, Signed) end, BaseAssignment, Wallets ). +merge_commitments(Base, Signed) -> + Signed#{ + <<"commitments">> => + maps:merge( + maps:get(<<"commitments">>, Base, #{}), + maps:get(<<"commitments">>, Signed, #{}) + ) + }. + %% @doc Potentially inform the caller that the assignment has been scheduled. %% The main assignment loop calls this function repeatedly at different stages %% of the assignment process. The scheduling mode determines which stages @@ -315,6 +323,42 @@ maybe_inform_recipient(Mode, ReplyPID, Message, Assignment, State) -> _ -> ok end. +dispatch_uploads(Message, Assignment, ReplyPID, State) -> + Opts = maps:get(opts, State), + UploadOpts = Opts#{ <<"http-monitor">> => not_found }, + UploadFun = + fun() -> + ?event(uploading_message), + hb_client_remote:upload(Message, UploadOpts), + hb_client_remote:upload(Assignment, UploadOpts), + ?event(uploads_complete), + maybe_inform_recipient( + remote_confirmation, + ReplyPID, + Message, + Assignment, + State + ) + end, + case maps:get(mode, State) of + remote_confirmation -> UploadFun(); + _ -> + spawn( + fun() -> + try UploadFun() + catch Class:Reason:Stack -> + ?event(warning, + {scheduler_upload_failed, + {class, Class}, + {reason, Reason}, + {trace, Stack} + } + ) + end + end + ) + end. + %% @doc Find the hashpath of the base state upon which a new assignment should %% be applied. base_state(S = #{ base_state_hashpath := undefined }) -> diff --git a/src/preloaded/process/dev_scheduler_formats.erl b/src/preloaded/process/lib_scheduler_formats.erl similarity index 97% rename from src/preloaded/process/dev_scheduler_formats.erl rename to src/preloaded/process/lib_scheduler_formats.erl index 8d178f209..ad176b68d 100644 --- a/src/preloaded/process/dev_scheduler_formats.erl +++ b/src/preloaded/process/lib_scheduler_formats.erl @@ -1,12 +1,12 @@ -%%% @doc This module is used by dev_scheduler in order to produce outputs that -%%% are compatible with various forms of AO clients. It features two main formats: +%%% @doc Shared scheduler response format helpers for devices that need outputs +%%% compatible with various forms of AO clients. It features two main formats: %%% %%% - `application/json' %%% - `application/http' %%% %%% The `application/json' format is a legacy format that is not recommended for %%% new integrations of the AO protocol. --module(dev_scheduler_formats). +-module(lib_scheduler_formats). -export([assignments_to_bundle/4, assignments_to_aos2/4]). -export([aos2_to_assignments/3, aos2_to_assignment/2]). -export([aos2_normalize_types/1]). diff --git a/src/preloaded/util/dev_apply.erl b/src/preloaded/util/dev_apply.erl index c1d7e813e..544723459 100644 --- a/src/preloaded/util/dev_apply.erl +++ b/src/preloaded/util/dev_apply.erl @@ -75,15 +75,68 @@ eval(Base, Request, Opts) -> >> }; {ok, ApplyPath} -> - ApplyMsg = ApplyBase#{ <<"path">> => ApplyPath }, + ApplyMsg = apply_msg(ApplyBase, ApplyPath, Base, Request, Opts), ?event({executing, ApplyMsg}), - hb_ao:resolve(ApplyMsg, Opts) + execute_apply(ApplyMsg, ApplyPath, Opts) end end else Error -> error_to_message(Error) end. +execute_apply(ApplyMsg, ApplyPath, Opts) -> + case hb_path:term_to_path_parts(ApplyPath) of + [ID | _] when ?IS_ID(ID) -> + Meta = hb_device:message_to_device( + #{ <<"device">> => <<"meta@1.0">> }, + Opts + ), + unwrap_ao_result(Meta:handle(Opts, ApplyMsg), Opts); + _ -> + hb_ao:resolve(ApplyMsg, Opts) + end. + +unwrap_ao_result({ok, Res}, Opts) when is_map(Res) -> + case hb_maps:get(<<"ao-result">>, Res, not_found, Opts) of + Key when is_binary(Key) -> + case hb_maps:find(Key, Res, Opts) of + {ok, Value} -> {ok, Value}; + error -> {error, not_found} + end; + _ -> + {ok, Res} + end; +unwrap_ao_result(Res, _Opts) -> + Res. + +apply_msg(ApplyBase, ApplyPath, Base, Request, Opts) -> + Msg = ApplyBase#{ <<"path">> => ApplyPath }, + RoutedMsg = with_routed_from(Msg, Base, Request, Opts), + case hb_path:term_to_path_parts(ApplyPath) of + [ID | _] when ?IS_ID(ID) -> + hb_maps:without([<<"device">>], RoutedMsg, Opts); + _ -> + RoutedMsg + end. + +with_routed_from(Msg, Base, Request, Opts) -> + case hb_ao:get_first( + [ + {Msg, <<"routed-from">>}, + {Request, <<"routed-from">>}, + {Base, <<"routed-from">>} + ], + not_found, + Opts#{ <<"hashpath">> => ignore } + ) of + not_found -> Msg; + Router -> + case hb_maps:get(<<"routed-from">>, Msg, not_found, Opts) of + not_found -> Msg#{ <<"routed-from">> => Router }; + _ -> Msg + end + end. + %% @doc Apply the message found at `request' to the message found at `base'. pair(Base, Request, Opts) -> pair(<<"undefined">>, Base, Request, Opts). @@ -285,3 +338,30 @@ apply_over_http_test() -> #{ <<"priv-wallet">> => hb:wallet() } ) ). + +apply_id_path_as_singleton_test() -> + Opts = #{ <<"store">> => hb_test_utils:test_store() }, + Node = hb_http_server:start_node(Opts), + Signed = + hb_message:commit( + #{ <<"body">> => <<"DATA">> }, + Opts#{ <<"priv-wallet">> => hb:wallet() } + ), + {ok, ID} = hb_cache:write(Signed, Opts), + Path = <<"/", ID/binary, "/body">>, + ?assertEqual({ok, <<"DATA">>}, hb_http:get(Node, Path, Opts)), + ?assertEqual( + {ok, <<"DATA">>}, + hb_http:request( + <<"POST">>, + Node, + <<"/user-path">>, + #{ + <<"device">> => <<"apply@1.0">>, + <<"source">> => <<"user-message">>, + <<"user-path">> => Path, + <<"user-message">> => #{ <<"method">> => <<"GET">> } + }, + Opts + ) + ). diff --git a/src/preloaded/vm/dev_delegated_compute.erl b/src/preloaded/vm/dev_delegated_compute.erl index 64daea028..f7c058f6d 100644 --- a/src/preloaded/vm/dev_delegated_compute.erl +++ b/src/preloaded/vm/dev_delegated_compute.erl @@ -3,7 +3,7 @@ %%% bring trusted results into the local node, or as the `Execution-Device' of %%% an AO process. -module(dev_delegated_compute). --device_libraries([lib_process]). +-device_libraries([lib_process, lib_scheduler_formats]). -export([init/3, compute/3, normalize/3, snapshot/3]). -include("include/hb.hrl"). -include_lib("eunit/include/eunit.hrl"). @@ -83,7 +83,7 @@ do_compute(ProcID, Req, Opts) -> ?event({do_compute_msg, {req, Req}}), Slot = hb_ao:get(<<"slot">>, Req, Opts), {ok, AOS2 = #{ <<"body">> := Body }} = - dev_scheduler_formats:assignments_to_aos2( + lib_scheduler_formats:assignments_to_aos2( ProcID, #{ Slot => Req @@ -151,6 +151,7 @@ do_relay(Method, Path, Body, Headers, Opts) -> Headers#{ <<"path">> => <<"call">>, <<"target">> => <<"payload">>, + <<"peer">> => genesis_wasm_peer(Opts), <<"payload">> => Headers#{ <<"path">> => Path, @@ -233,6 +234,7 @@ snapshot(Msg, Req, Opts) -> }, #{ <<"path">> => <<"call">>, + <<"peer">> => genesis_wasm_peer(Opts), <<"relay-method">> => <<"POST">>, <<"relay-path">> => <<"/snapshot/", ProcID/binary>>, <<"content-type">> => <<"application/json">>, @@ -254,3 +256,10 @@ snapshot(Msg, Req, Opts) -> <<"error-details">> => Error }} end. + +genesis_wasm_peer(Opts) -> + Port = + integer_to_binary( + hb_opts:get(genesis_wasm_port, 6363, Opts) + ), + <<"http://localhost:", Port/binary>>.