Skip to content

Commit 85f1d7a

Browse files
acogoluegnesmergify[bot]
authored andcommitted
Return error if stream publisher reference is longer than 255 characters
Fixes #12499 (cherry picked from commit 4e8fb46)
1 parent 2292638 commit 85f1d7a

File tree

2 files changed

+58
-3
lines changed

2 files changed

+58
-3
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

+24-2
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@
8181
-define(UNKNOWN_FIELD, unknown_field).
8282
-define(SILENT_CLOSE_DELAY, 3_000).
8383

84+
-import(rabbit_stream_utils, [check_write_permitted/2]).
85+
8486
%% client API
8587
-export([start_link/4,
8688
info/2,
@@ -1655,6 +1657,26 @@ handle_frame_post_auth(Transport,
16551657
{C1#stream_connection{connection_step = failure}, S1}
16561658
end,
16571659
{Connection1, State1};
1660+
handle_frame_post_auth(Transport,
1661+
#stream_connection{user = User,
1662+
resource_alarm = false} = C,
1663+
State,
1664+
{request, CorrelationId,
1665+
{declare_publisher, _PublisherId, WriterRef, S}})
1666+
when is_binary(WriterRef), byte_size(WriterRef) > 255 ->
1667+
{Code, Counter} = case check_write_permitted(stream_r(S, C), User) of
1668+
ok ->
1669+
{?RESPONSE_CODE_PRECONDITION_FAILED, ?PRECONDITION_FAILED};
1670+
error ->
1671+
{?RESPONSE_CODE_ACCESS_REFUSED, ?ACCESS_REFUSED}
1672+
end,
1673+
response(Transport,
1674+
C,
1675+
declare_publisher,
1676+
CorrelationId,
1677+
Code),
1678+
rabbit_global_counters:increase_protocol_counter(stream, Counter, 1),
1679+
{C, State};
16581680
handle_frame_post_auth(Transport,
16591681
#stream_connection{user = User,
16601682
publishers = Publishers0,
@@ -1664,7 +1686,7 @@ handle_frame_post_auth(Transport,
16641686
State,
16651687
{request, CorrelationId,
16661688
{declare_publisher, PublisherId, WriterRef, Stream}}) ->
1667-
case rabbit_stream_utils:check_write_permitted(stream_r(Stream,
1689+
case check_write_permitted(stream_r(Stream,
16681690
Connection0),
16691691
User)
16701692
of
@@ -3102,7 +3124,7 @@ evaluate_state_after_secret_update(Transport,
31023124
{_, Conn1} = ensure_token_expiry_timer(User, Conn0),
31033125
PublisherStreams =
31043126
lists:foldl(fun(#publisher{stream = Str}, Acc) ->
3105-
case rabbit_stream_utils:check_write_permitted(stream_r(Str, Conn0), User) of
3127+
case check_write_permitted(stream_r(Str, Conn0), User) of
31063128
ok ->
31073129
Acc;
31083130
_ ->

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

+34-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ groups() ->
6464
test_super_stream_duplicate_partitions,
6565
authentication_error_should_close_with_delay,
6666
unauthorized_vhost_access_should_close_with_delay,
67-
sasl_anonymous
67+
sasl_anonymous,
68+
test_publisher_with_too_long_reference_errors
6869
]},
6970
%% Run `test_global_counters` on its own so the global metrics are
7071
%% initialised to 0 for each testcase
@@ -945,6 +946,38 @@ unauthorized_vhost_access_should_close_with_delay(Config) ->
945946
closed = wait_for_socket_close(T, S, 10),
946947
ok.
947948

949+
test_publisher_with_too_long_reference_errors(Config) ->
950+
FunctionName = atom_to_binary(?FUNCTION_NAME, utf8),
951+
T = gen_tcp,
952+
Port = get_port(T, Config),
953+
Opts = get_opts(T),
954+
{ok, S} = T:connect("localhost", Port, Opts),
955+
C = rabbit_stream_core:init(0),
956+
ConnectionName = FunctionName,
957+
test_peer_properties(T, S, #{<<"connection_name">> => ConnectionName}, C),
958+
test_authenticate(T, S, C),
959+
960+
Stream = FunctionName,
961+
test_create_stream(T, S, Stream, C),
962+
963+
MaxSize = 255,
964+
ReferenceOK = iolist_to_binary(lists:duplicate(MaxSize, <<"a">>)),
965+
ReferenceKO = iolist_to_binary(lists:duplicate(MaxSize + 1, <<"a">>)),
966+
967+
Tests = [{1, ReferenceOK, ?RESPONSE_CODE_OK},
968+
{2, ReferenceKO, ?RESPONSE_CODE_PRECONDITION_FAILED}],
969+
970+
[begin
971+
F = request({declare_publisher, PubId, Ref, Stream}),
972+
ok = T:send(S, F),
973+
{Cmd, C} = receive_commands(T, S, C),
974+
?assertMatch({response, 1, {declare_publisher, ExpectedResponseCode}}, Cmd)
975+
end || {PubId, Ref, ExpectedResponseCode} <- Tests],
976+
977+
test_delete_stream(T, S, Stream, C),
978+
test_close(T, S, C),
979+
ok.
980+
948981
consumer_offset_info(Config, ConnectionName) ->
949982
[[{offset, Offset},
950983
{offset_lag, Lag}]] = rpc(Config, 0, ?MODULE,

0 commit comments

Comments
 (0)