Asynchronous Dancer2 PT. 2 - Promises and a Bonus!

Now that you're familiar with asynchronous programming in Dancer2, we can delve into taking awkward async code and turning it into more manageable code using Promises.

Awkward?

Looking at the code in the previous article, you might see how our code begins to shift to the right with growing callbacks. Each async code requires providing anonymous subroutines that will get triggered once the async request (whether API request or DB call) returns. Our code will start to look like an arrow.

use experimental qw< postderef signatures >;

do_this_async( sub ($foo) {
    do_this_other_thing_async( $foo, sub ($bar) {
        foreach my $baz ( $bar->@* ) {
            do_more_async( $baz, sub ($quux) {
                ...
            });
        }
    });
});

Eventually, this can become quite unwieldy. Promises is a pattern that can help us tame our code. Promises is an exceptional implementation of this pattern, so we will use it.

So what are Promises?

Promises are a simple mechanism for turning that arrow pattern above into a clear straight chain where each callback is stored in an object using a method, and the eventual value can be used in another callback, again stored in the same object using a method.

Promises also provides us with comfortable method chaining..

Let's take the above code and rewrite it using the Promises syntax:

use experimental qw< postderef signatures >;

do_this_async()->then( sub ($foo) {
    return do_this_other_thing_async($foo)
})->then( sub ($bar) {
    my @promises = do_more_async($_)
        for $bar->@*;

    return collect(@promises);
})->then( sub (@results) {
    ...
});

Assuming that do_this_async returns a Promise object, we can call then to tack on a callback for the result that this method will eventually retrieve.

We can then call another method (call_this_other_thing_async), which will also return a Promise, to which we tack another callback that will use the value we retrieved from the previous method.

Notice everything here so far had been done with callbacks. We don't execute anything except the initial do_this_async. The rest simply registers callbacks. When it all gets executed at the end, this smart chaining will cause them to be triggered in the right order, connecting them together.

Next, we call a method do_more_async which returns a Promise again, for each of our input. We collect those in a variable and call the Promises function collect which creates a single promise from a set of them. We can then tack a callback for it using then.

Of course, we will need to have these functions return Promise objects, so you'll get to see that.

Okay, so how do we rewrite this?

You might not have fully understood how Promises - that's okay. But even if you did, you might not know how to move our previous code to this. No matter which category you fall in (maybe both?), we're now going to do it together.

Update our imports

package CovidStats::Promises;

use Dancer2;
use experimental qw< postderef signatures >;

use DateTime;
use AnyEvent;
use AnyEvent::HTTP;
use Promises qw< collect deferred >;
use Statistics::Descriptive::Full;
use URI::Escape qw< uri_escape >;

use constant {
    'MAX_COUNTRIES' => 5,
    'MAX_DAYS'      => 14,
    'SUMMARY_URL'   => 'https://covid-api.mmediagroup.fr/v1/cases',
    'COUNTRY_URL'   => 'https://covid-api.mmediagroup.fr/v1/history?country=%s&status=Confirmed',
    'LTRIM'         => '0.25',
};

Similar to before, but this time we also load Promises.

Starting with the initial request

The big benefit of Promises is that it changes the order of writing (without changing the order of execution) so while we previous started with writing the code for the end result, we don't need to do this now. We can begin directly with the first request.

We will need to turn this all to Promises, so we start with creating a deferred object:

get '/' => sub {
    return delayed {
        flush();

    my $def = deferred();

We can decide that this succeeded (using resolve) or failed (using reject) and we can retrieve the Promise object from it. (We're covering Promises loosely here, so you should probably read the Promises documentation to fully comprehend the syntax.)

Now the request:

http_get SUMMARY_URL(), sub ( $body, $hdrs ) {
    my $data = from_json($body);
    $def->resolve($data)
};

What we do here is make a request. When it will respond (whenever that is), we make sure to mark the promise object as resolved. We also send it what we got so it could be used as the eventual data in the Promise.

We can now use the Promise object by calling promise object on the deferred object. Once we have the Promise object, we can start tacking on the rest of the logic.

$def->promise->then( sub ($data) {
    my @countries = ( sort {
        $data->{$b}{'All'}{'confirmed'} <=> $data->{$a}{'All'}{'confirmed'}
    } grep $_ ne 'Global', keys $data->%* )[ 0 .. MAX_COUNTRIES() ];

    if ( !@countries ) {
        die "Sorry, failed to received countries\n";
    }

    return \@countries;
))->then( delayed( sub ($countries) {...} ) );

We start by getting the Promise (by calling promise on the deferred) and from now on, we can just start chaining then methods. The $data variable will eventually (once it actually gets called) be the parameter that we sent to the resolve method above. That's the chaining that is happening here.

We can define the top countries and return them. We can also just die when something fails because any die within Promises is caught and managed using an exception catching block. We'll get to it at the end.

Notice this time, we don't use a delayed block because we're not calling any Dancer2 keywords (like content or done).

This response will return and be wrapped in a Promise, which then allows us to chain another then to it with a callback. The parameter will be the countries.

You might be wondering why we create two code blocks instead of keeping one big code block. Theoretically, we could, but when you separate to multiple blocks it's both more readable, as well as allows the event loop to run things between those code blocks if necessary. In short, this is just good practice.

Multiple requests with Promises

Here is a bit of a trick. A promise is meant for only one value, which normally means only one asynchronous action. We would need to collect multiple Promises - one Promise per API request (per country, based on our API). We can do that and then create a single Promise from all of them using... collect!

}))->then( delayed( sub ($countries) {
    my $yesterday = DateTime->now->subtract( 'days' => 1 );

    my @promises;
    foreach my $country ( $countries->@* ) {
        my $data_url = sprintf COUNTRY_URL(), uri_escape($country);
        my $def      = deferred();

        http_get $data_url, delayed( sub ( $body, $hdrs ) {
            my $country_data = from_json($body);

            my @period;

            my $day = $yesterday;
            foreach ( 0 .. MAX_DAYS() ) {
                push @period, $day->ymd();
                $day = $day->subtract( 'days' => 1 );
            }

            my @period_data = $country_data->{'All'}{'dates'}->@{@period};

            my $stat = Statistics::Descriptive::Full->new();
            $stat->add_data($_)
                for @period_data;

            $def->resolve( $country => $stat->trimmed_mean( LTRIM() ) );
        });

        push @promises, $def->promise();
    }

    return collect(@promises);
}))->then( delayed( sub (@stats_by_country) {...} ) );

Notice we repeat the same pattern with create a deferred Promise, then making all of these requests, adding the Promise from the deferred object into an array, and eventually calling collect to create a single Promise.

The then callback will receive an array where each element represents the response from each Promise. The order will not be by execution, but by how we inserted them into the original @promises array.

Handle the result

Simple enough:

}))->then( delayed( sub (@stats_by_country) {
    content "By country (period of " . MAX_DAYS() . " days):\n";
    content "- $_->[0]: $_->[1]\n"
        for @stats_by_country;
}))...

Add an exception catch block and our final block

Promises give us the catch block to catch exceptions and a finally block to handle the end, whether worked or not.

In this case, we can use them as such:

}))->catch( delayed( sub ($error) {
    content($error);
}))->finally( delayed( sub (@args) {
    content "\nThank you for visiting our API\n";
    done();
}));

We caught the error and sent it to the user. We end everything by calling the done in the finally block.

Full program

The full program is:

package CovidStats::Promises;

use Dancer2;
use experimental qw< postderef signatures >;

use DateTime;
use AnyEvent;
use AnyEvent::HTTP;
use Promises qw< collect deferred >;
use Statistics::Descriptive::Full;
use URI::Escape qw< uri_escape >;

use constant {
    'MAX_COUNTRIES' => 5,
    'MAX_DAYS'      => 14,
    'SUMMARY_URL'   => 'https://covid-api.mmediagroup.fr/v1/cases',
    'COUNTRY_URL'   => 'https://covid-api.mmediagroup.fr/v1/history?country=%s&status=Confirmed',
    'LTRIM'         => '0.25',
};

get '/' => sub {
    return delayed {
        flush();

        my $def = deferred();

        http_get SUMMARY_URL(), delayed( sub ( $body, $hdrs ) {
            my $data = from_json($body);
            $def->resolve($data)
        });

        $def->promise->then( delayed( sub ($data) {
            my @countries = ( sort {
                $data->{$b}{'All'}{'confirmed'} <=> $data->{$a}{'All'}{'confirmed'}
            } grep $_ ne 'Global', keys $data->%* )[ 0 .. MAX_COUNTRIES() ];

            if ( !@countries ) {
                die "Sorry, failed to received countries\n";
            }

            return \@countries;
        }))->then( delayed( sub ($countries) {
            my $yesterday = DateTime->now->subtract( 'days' => 1 );

            my @promises;
            foreach my $country ( $countries->@* ) {
                my $data_url = sprintf COUNTRY_URL(), uri_escape($country);
                my $def      = deferred();

                http_get $data_url, delayed( sub ( $body, $hdrs ) {
                    my $country_data = from_json($body);

                    my @period;

                    my $day = $yesterday;
                    foreach ( 0 .. MAX_DAYS() ) {
                        push @period, $day->ymd();
                        $day = $day->subtract( 'days' => 1 );
                    }

                    my @period_data = $country_data->{'All'}{'dates'}->@{@period};

                    my $stat = Statistics::Descriptive::Full->new();
                    $stat->add_data($_)
                        for @period_data;

                    $def->resolve( $country => $stat->trimmed_mean( LTRIM() ) );
                });

                push @promises, $def->promise();
            }

            return collect(@promises);
        }))->then( delayed( sub (@stats_by_country) {
            content "By country (period of " . MAX_DAYS() . " days):\n";
            content "- $_->[0]: $_->[1]\n"
                for @stats_by_country;
        }))->catch( delayed( sub ($error) {
            content($error);
        }))->finally( delayed( sub (@args) {
            content "\nThank you for visiting our API\n";
            done();
        }));

    } );
};

1;

Application runner and running

Our app.psgi file is simple enough:

use CovidStats::Promises;
CovidStats::Promises->to_app();

We can run this with Twiggy in the following manner:

$ plackup -s Twiggy bin/app.psgi
Twiggy: Accepting connections at http://0.0.0.0:5000/

Normally, plackup is very good at recognizing which server to use. If we didn't specify to use Twiggy, it will still get it right:

$ plackup bin/app.psgi    
Twiggy: Accepting connections at http://0.0.0.0:5000/

Of course, on production you would set up something more elaborate instead of running this on a terminal. I suggest looking at Dancer2::Manual::Deployment for production use.

Testing out our application

On another terminal, we will run the following command:

$ curl localhost:5000
By country (period of 7 days):
- US: 16640229.25
- India: 9690261.25
- Brazil: 6294810
- France: 2118033.5
- Russia: 1854813.5
- United Kingdom: 1110655.5

Thank you for visiting our API

Done!

Where did the condvar go?

The studious might notice there's no condvar ($cv) used in our example. This is because Promises automatically handles that part. It also identified that we're using AnyEvent so it used AnyEvent and its condvars for implementing the event loop handling.

You said something about a surprise?

In the previous article we discussed other options than AnyEvent. If you're interested in writing async code, I suggested look into IO::Async.

To get you started, I implemented the same exercise using IO::Async. Luckily, because it already uses Future (a more advanced version of the Promises pattern), we don't need to write one version with plain syntax and then another with Promises. It will provide us with this interface by default.

package CovidStats::IOAsync;

use Dancer2;
use experimental qw< postderef signatures >;

use DateTime;
use IO::Async;
use IO::Async::Loop;
use Future::Utils qw< fmap >;
use Net::Async::HTTP;
use Statistics::Descriptive::Full;
use URI::Escape qw< uri_escape >;

use constant {
    'MAX_COUNTRIES' => 5,
    'MAX_DAYS'      => 14,
    'SUMMARY_URL'   => 'https://covid-api.mmediagroup.fr/v1/cases',
    'COUNTRY_URL'   => 'https://covid-api.mmediagroup.fr/v1/history?country=%s&status=Confirmed',
    'LTRIM'         => '0.25',
};

get '/' => sub {
    return delayed {
        flush();

        my $loop = IO::Async::Loop->new();
        my $http = Net::Async::HTTP->new();
        $loop->add($http);

        my $main_req = $http->GET( SUMMARY_URL() )->then( sub ($res) {
            my $data      = from_json( $res->content );
            my @countries = ( sort {
                $data->{$b}{'All'}{'confirmed'} <=> $data->{$a}{'All'}{'confirmed'}
            } grep $_ ne 'Global', keys $data->%* )[ 0 .. MAX_COUNTRIES() ];

            if ( !@countries ) {
                content("Sorry, failed to received countries\n");
                done();
                return;
            }

            return \@countries;
        })->then( sub ($countries) {
            my $yesterday = DateTime->now->subtract( 'days' => 1 );

            fmap( sub ($country) {
                my $data_url = sprintf COUNTRY_URL(), uri_escape($country);

                return $http->GET($data_url)->then( sub ($res) {
                    my $country_data = from_json( $res->content );

                    my @period;
                    my $day = $yesterday;
                    foreach ( 0 .. MAX_DAYS() ) {
                        push @period, $day->ymd();
                        $day = $day->subtract( 'days' => 1 );
                    }

                    my @period_data = $country_data->{'All'}{'dates'}->@{@period};

                    my $stat = Statistics::Descriptive::Full->new();
                    $stat->add_data($_)
                        for @period_data;

                    return [ $country => $stat->trimmed_mean( LTRIM() ) ];
                })->catch( sub ($error) {
                    content("Sorry, failed to fetch data for $country: $error");
                });
            }, 'foreach'    => $countries,
               'concurrent' => scalar $countries->@*,
            )->then( sub (@results) {
                content( "By country (period of " . MAX_DAYS() . " days):\n" );
                content( "- $_->[0]: $_->[1]\n" )
                    for sort { $b->[1] <=> $a->[1] } @results;

                content("\nThank you for visiting our API\n");

                done();
            });
        })->catch( sub ($error) {
            content("Sorry, failed to receive data: $error\n");
            done();
        });

        $loop->await($main_req);
    };
};

1;

We can run it with plackup in a very similar way, but we will use a different web server. Luckily, the author for IO::Async had done us the service of writing a PSGI web server in IO::Async, so we can use that.

Our app.psgi file:

use CovidStats::IOAsync;
CovidStats::IOAsync->to_app();

We can run this with Net::Async::HTTP::Server in the following manner:

$ plackup -s Net::Async::HTTP::Server bin/app.psgi
Plack::Handler::Net::Async::HTTP::Server: Accepting connections at http://0:5000/

I recommend checking out IO::Async. It comes with a host of useful, advanced, modern tools. It has a lot of thorough documentation. It has a lively community, and a responsive and supportive author.

You can read more about IO::Async here and in Paul Evans' Advent Calendar 2020.

Author

This article has been written by Sawyer X for the Perl Dancer Advent Calendar 2020.

Copyright

No copyright retained. Enjoy.

Sawyer X.