Randomly dropping messages is the easiest way to do such a thing, and might also be the most robust implementation, due to its simplicity.
The trick is to define some threshold value between 0.0 and 1.0 and to fetch a random number in that range:
-module(drop).
-export([random/1]).
random(Rate) ->
maybe_seed(),
random:uniform() =< Rate.
maybe_seed() ->
case get(random_seed) of
undefined -> random:seed(erlang:now());
{X,X,X} -> random:seed(erlang:now());
_ -> ok
end.
If you aim to keep 95% of the messages you send, the authorization could be written by a call to case drop:random(0.95) of true -> send(); false -> drop() end, or a shorter drop:random(0.95) andalso send() if you don’t need to do anything specific when dropping a message.
如果你想要保持收到95%消息,那么就可以调用case drop:random(0.95) of
ture -> send();
false -> drop()
end.
drop:random(0.95) andalso send().
The best way to avoid overloading a queue is to not send data its way in the first place. Because there are no bounded mailboxes in Erlang, dropping in the receiving process only guarantees that this process will be spinning wildly, trying to get rid of messages, and fighting the schedulers to do actual work.
On the other hand, dropping at the producer level is guaranteed to distribute the work equally across all processes.
This can give place to interesting optimizations where the working process or a given monitor process15 uses values in an ETS table or application:set_env/3 to dynamically increase and decrease the threshold to be used with the random number.
This allows control over how many messages are dropped based on overload, and the configuration data can be fetched by any process rather efficiently by using application:get_env/2.
Similar techniques could also be used to implement different drop ratios for different message priorities, rather than trying to sort it all out at the consumer level.
[15] Any process tasked with checking the load of specific processes using heuristics such as process_info(Pid, message_queue_len) could be a monitor
[注15]:任何进程都可以使用如process_info(Pid,message_queue_len)的函数来监控另一个进程,所以就叫监控进程。