Asynchronous Dancer2 PT. 2 - Promises and a Bonus!
Now that you're familiar with asynchronous programming in Dancer2, we can delve into taking awkward async code and turning it into more manageable code using Promises.
Awkward?
Looking at the code in the previous article, you might see how our code begins to shift to the right with growing callbacks. Each async code requires providing anonymous subroutines that will get triggered once the async request (whether API request or DB call) returns. Our code will start to look like an arrow.
use experimental qw< postderef signatures >; do_this_async( sub ($foo) { do_this_other_thing_async( $foo, sub ($bar) { foreach my $baz ( $bar->@* ) { do_more_async( $baz, sub ($quux) { ... }); } }); });
Eventually, this can become quite unwieldy. Promises is a pattern that can help us tame our code. Promises is an exceptional implementation of this pattern, so we will use it.
So what are Promises?
Promises are a simple mechanism for turning that arrow pattern above into a clear straight chain where each callback is stored in an object using a method, and the eventual value can be used in another callback, again stored in the same object using a method.
Promises also provides us with comfortable method chaining..
Let's take the above code and rewrite it using the Promises syntax:
use experimental qw< postderef signatures >; do_this_async()->then( sub ($foo) { return do_this_other_thing_async($foo) })->then( sub ($bar) { my @promises = do_more_async($_) for $bar->@*; return collect(@promises); })->then( sub (@results) { ... });
Assuming that do_this_async
returns a Promise object,
we can call then
to tack on a callback for the result that
this method will eventually retrieve.
We can then call another method (call_this_other_thing_async
),
which will also return a Promise, to which we tack another callback
that will use the value we retrieved from the previous method.
Notice everything here so far had been done with callbacks. We don't
execute anything except the initial do_this_async
. The rest simply
registers callbacks. When it all gets executed at the end, this smart
chaining will cause them to be triggered in the right order, connecting
them together.
Next, we call a method do_more_async
which returns a Promise again,
for each of our input. We collect those in a variable and call the
Promises
function collect
which creates a single promise from a
set of them. We can then tack a callback for it using then
.
Of course, we will need to have these functions return Promise objects, so you'll get to see that.
Okay, so how do we rewrite this?
You might not have fully understood how Promises - that's okay. But even if you did, you might not know how to move our previous code to this. No matter which category you fall in (maybe both?), we're now going to do it together.
Update our imports
package CovidStats::Promises; use Dancer2; use experimental qw< postderef signatures >; use DateTime; use AnyEvent; use AnyEvent::HTTP; use Promises qw< collect deferred >; use Statistics::Descriptive::Full; use URI::Escape qw< uri_escape >; use constant { 'MAX_COUNTRIES' => 5, 'MAX_DAYS' => 14, 'SUMMARY_URL' => 'https://covid-api.mmediagroup.fr/v1/cases', 'COUNTRY_URL' => 'https://covid-api.mmediagroup.fr/v1/history?country=%s&status=Confirmed', 'LTRIM' => '0.25', };
Similar to before, but this time we also load Promises.
Starting with the initial request
The big benefit of Promises is that it changes the order of writing (without changing the order of execution) so while we previous started with writing the code for the end result, we don't need to do this now. We can begin directly with the first request.
We will need to turn this all to Promises, so we start with creating a deferred object:
get '/' => sub { return delayed { flush(); my $def = deferred();
We can decide that this succeeded (using resolve
) or failed (using reject
)
and we can retrieve the Promise object from it. (We're covering Promises loosely
here, so you should probably read the Promises documentation to fully
comprehend the syntax.)
Now the request:
http_get SUMMARY_URL(), sub ( $body, $hdrs ) { my $data = from_json($body); $def->resolve($data) };
What we do here is make a request. When it will respond (whenever that is), we make sure to mark the promise object as resolved. We also send it what we got so it could be used as the eventual data in the Promise.
We can now use the Promise object by calling promise
object on the deferred
object. Once we have the Promise object, we can start tacking on the rest of
the logic.
$def->promise->then( sub ($data) { my @countries = ( sort { $data->{$b}{'All'}{'confirmed'} <=> $data->{$a}{'All'}{'confirmed'} } grep $_ ne 'Global', keys $data->%* )[ 0 .. MAX_COUNTRIES() ]; if ( !@countries ) { die "Sorry, failed to received countries\n"; } return \@countries; ))->then( delayed( sub ($countries) {...} ) );
We start by getting the Promise (by calling promise
on the deferred) and
from now on, we can just start chaining then
methods. The $data
variable will eventually (once it actually gets called) be the parameter
that we sent to the resolve
method above. That's the chaining that is
happening here.
We can define the top countries and return them. We can also just die
when something fails because any die
within Promises is caught and
managed using an exception catching block. We'll get to it at the end.
Notice this time, we don't use a delayed
block because we're not calling
any Dancer2 keywords (like content
or done
).
This response will return and be wrapped in a Promise, which then allows
us to chain another then
to it with a callback. The parameter will
be the countries.
You might be wondering why we create two code blocks instead of keeping one big code block. Theoretically, we could, but when you separate to multiple blocks it's both more readable, as well as allows the event loop to run things between those code blocks if necessary. In short, this is just good practice.
Multiple requests with Promises
Here is a bit of a trick. A promise is meant for only one value, which
normally means only one asynchronous action. We would need to collect
multiple Promises - one Promise per API request (per country, based
on our API). We can do that and then create a single Promise from all
of them using... collect
!
}))->then( delayed( sub ($countries) { my $yesterday = DateTime->now->subtract( 'days' => 1 ); my @promises; foreach my $country ( $countries->@* ) { my $data_url = sprintf COUNTRY_URL(), uri_escape($country); my $def = deferred(); http_get $data_url, delayed( sub ( $body, $hdrs ) { my $country_data = from_json($body); my @period; my $day = $yesterday; foreach ( 0 .. MAX_DAYS() ) { push @period, $day->ymd(); $day = $day->subtract( 'days' => 1 ); } my @period_data = $country_data->{'All'}{'dates'}->@{@period}; my $stat = Statistics::Descriptive::Full->new(); $stat->add_data($_) for @period_data; $def->resolve( $country => $stat->trimmed_mean( LTRIM() ) ); }); push @promises, $def->promise(); } return collect(@promises); }))->then( delayed( sub (@stats_by_country) {...} ) );
Notice we repeat the same pattern with create a deferred Promise, then making
all of these requests, adding the Promise from the deferred object into an
array, and eventually calling collect
to create a single Promise.
The then
callback will receive an array where each element represents
the response from each Promise. The order will not be by execution, but by
how we inserted them into the original @promises
array.
Handle the result
Simple enough:
}))->then( delayed( sub (@stats_by_country) { content "By country (period of " . MAX_DAYS() . " days):\n"; content "- $_->[0]: $_->[1]\n" for @stats_by_country; }))...
Add an exception catch block and our final block
Promises give us the catch
block to catch exceptions and a finally
block to handle the end, whether worked or not.
In this case, we can use them as such:
}))->catch( delayed( sub ($error) { content($error); }))->finally( delayed( sub (@args) { content "\nThank you for visiting our API\n"; done(); }));
We caught the error and sent it to the user. We end everything by calling
the done
in the finally
block.
Full program
The full program is:
package CovidStats::Promises; use Dancer2; use experimental qw< postderef signatures >; use DateTime; use AnyEvent; use AnyEvent::HTTP; use Promises qw< collect deferred >; use Statistics::Descriptive::Full; use URI::Escape qw< uri_escape >; use constant { 'MAX_COUNTRIES' => 5, 'MAX_DAYS' => 14, 'SUMMARY_URL' => 'https://covid-api.mmediagroup.fr/v1/cases', 'COUNTRY_URL' => 'https://covid-api.mmediagroup.fr/v1/history?country=%s&status=Confirmed', 'LTRIM' => '0.25', }; get '/' => sub { return delayed { flush(); my $def = deferred(); http_get SUMMARY_URL(), delayed( sub ( $body, $hdrs ) { my $data = from_json($body); $def->resolve($data) }); $def->promise->then( delayed( sub ($data) { my @countries = ( sort { $data->{$b}{'All'}{'confirmed'} <=> $data->{$a}{'All'}{'confirmed'} } grep $_ ne 'Global', keys $data->%* )[ 0 .. MAX_COUNTRIES() ]; if ( !@countries ) { die "Sorry, failed to received countries\n"; } return \@countries; }))->then( delayed( sub ($countries) { my $yesterday = DateTime->now->subtract( 'days' => 1 ); my @promises; foreach my $country ( $countries->@* ) { my $data_url = sprintf COUNTRY_URL(), uri_escape($country); my $def = deferred(); http_get $data_url, delayed( sub ( $body, $hdrs ) { my $country_data = from_json($body); my @period; my $day = $yesterday; foreach ( 0 .. MAX_DAYS() ) { push @period, $day->ymd(); $day = $day->subtract( 'days' => 1 ); } my @period_data = $country_data->{'All'}{'dates'}->@{@period}; my $stat = Statistics::Descriptive::Full->new(); $stat->add_data($_) for @period_data; $def->resolve( $country => $stat->trimmed_mean( LTRIM() ) ); }); push @promises, $def->promise(); } return collect(@promises); }))->then( delayed( sub (@stats_by_country) { content "By country (period of " . MAX_DAYS() . " days):\n"; content "- $_->[0]: $_->[1]\n" for @stats_by_country; }))->catch( delayed( sub ($error) { content($error); }))->finally( delayed( sub (@args) { content "\nThank you for visiting our API\n"; done(); })); } ); }; 1;
Application runner and running
Our app.psgi
file is simple enough:
use CovidStats::Promises; CovidStats::Promises->to_app();
We can run this with Twiggy in the following manner:
$ plackup -s Twiggy bin/app.psgi Twiggy: Accepting connections at http://0.0.0.0:5000/
Normally, plackup
is very good at recognizing which server to use.
If we didn't specify to use Twiggy, it will still get it right:
$ plackup bin/app.psgi Twiggy: Accepting connections at http://0.0.0.0:5000/
Of course, on production you would set up something more elaborate instead of running this on a terminal. I suggest looking at Dancer2::Manual::Deployment for production use.
Testing out our application
On another terminal, we will run the following command:
$ curl localhost:5000 By country (period of 7 days): - US: 16640229.25 - India: 9690261.25 - Brazil: 6294810 - France: 2118033.5 - Russia: 1854813.5 - United Kingdom: 1110655.5 Thank you for visiting our API
Done!
Where did the condvar go?
The studious might notice there's no condvar ($cv
) used in our example.
This is because Promises automatically handles that part. It also
identified that we're using AnyEvent so it used AnyEvent and its
condvars for implementing the event loop handling.
You said something about a surprise?
In the previous article we discussed other options than AnyEvent. If you're interested in writing async code, I suggested look into IO::Async.
To get you started, I implemented the same exercise using IO::Async. Luckily, because it already uses Future (a more advanced version of the Promises pattern), we don't need to write one version with plain syntax and then another with Promises. It will provide us with this interface by default.
package CovidStats::IOAsync; use Dancer2; use experimental qw< postderef signatures >; use DateTime; use IO::Async; use IO::Async::Loop; use Future::Utils qw< fmap >; use Net::Async::HTTP; use Statistics::Descriptive::Full; use URI::Escape qw< uri_escape >; use constant { 'MAX_COUNTRIES' => 5, 'MAX_DAYS' => 14, 'SUMMARY_URL' => 'https://covid-api.mmediagroup.fr/v1/cases', 'COUNTRY_URL' => 'https://covid-api.mmediagroup.fr/v1/history?country=%s&status=Confirmed', 'LTRIM' => '0.25', }; get '/' => sub { return delayed { flush(); my $loop = IO::Async::Loop->new(); my $http = Net::Async::HTTP->new(); $loop->add($http); my $main_req = $http->GET( SUMMARY_URL() )->then( sub ($res) { my $data = from_json( $res->content ); my @countries = ( sort { $data->{$b}{'All'}{'confirmed'} <=> $data->{$a}{'All'}{'confirmed'} } grep $_ ne 'Global', keys $data->%* )[ 0 .. MAX_COUNTRIES() ]; if ( !@countries ) { content("Sorry, failed to received countries\n"); done(); return; } return \@countries; })->then( sub ($countries) { my $yesterday = DateTime->now->subtract( 'days' => 1 ); fmap( sub ($country) { my $data_url = sprintf COUNTRY_URL(), uri_escape($country); return $http->GET($data_url)->then( sub ($res) { my $country_data = from_json( $res->content ); my @period; my $day = $yesterday; foreach ( 0 .. MAX_DAYS() ) { push @period, $day->ymd(); $day = $day->subtract( 'days' => 1 ); } my @period_data = $country_data->{'All'}{'dates'}->@{@period}; my $stat = Statistics::Descriptive::Full->new(); $stat->add_data($_) for @period_data; return [ $country => $stat->trimmed_mean( LTRIM() ) ]; })->catch( sub ($error) { content("Sorry, failed to fetch data for $country: $error"); }); }, 'foreach' => $countries, 'concurrent' => scalar $countries->@*, )->then( sub (@results) { content( "By country (period of " . MAX_DAYS() . " days):\n" ); content( "- $_->[0]: $_->[1]\n" ) for sort { $b->[1] <=> $a->[1] } @results; content("\nThank you for visiting our API\n"); done(); }); })->catch( sub ($error) { content("Sorry, failed to receive data: $error\n"); done(); }); $loop->await($main_req); }; }; 1;
We can run it with plackup
in a very similar way, but we will use a different
web server. Luckily, the author for IO::Async had done us the service of
writing a PSGI web server in IO::Async, so we can use that.
Our app.psgi
file:
use CovidStats::IOAsync; CovidStats::IOAsync->to_app();
We can run this with Net::Async::HTTP::Server in the following manner:
$ plackup -s Net::Async::HTTP::Server bin/app.psgi Plack::Handler::Net::Async::HTTP::Server: Accepting connections at http://0:5000/
I recommend checking out IO::Async. It comes with a host of useful, advanced, modern tools. It has a lot of thorough documentation. It has a lively community, and a responsive and supportive author.
You can read more about IO::Async here and in Paul Evans' Advent Calendar 2020.
Author
This article has been written by Sawyer X for the Perl Dancer Advent Calendar 2020.
Copyright
No copyright retained. Enjoy.
Sawyer X.