Asynchronous Dancer2 PT. 1 - What, Why, and How?

Dancer2 has support for asynchronous programming, allowing you to:

  • Write code that can run multiple operations at the same time
  • Write code that streams the response to the user
  • Run code in the background after the user finished the request

These are provided by the Delayed Response capability, which you can also read about in a previous post on it.

This sounds familiar...

If you think you've heard this before, it might be because of this article from the Perl Advent Calendar of 2016, in which we cover the asynchronous interface that Dancer2 provides.

But just because we wrote about it in the past, it does not mean we can't write about it again. Some messages are worth repeating. :)

So what's this about?

This two-part article will teach you about writing asynchronous code using promises.

This first part will focus on what asynchronous programming is, giving you some context, and we will even write some asynchronous code.

In the next part, we will focus on rewriting this code using the Promises pattern, making use of the Promises module.

But wait, why would I even want asynchronous code?

There are several cases in which asynchronous code execution is beneficial.

  • Imagine you need to make multiple database requests to calculate a response for the user. You need to request the user data, then the reservations for the user, and the latest messages they might have. Given the user ID from the session, these DB calls don't depend on each other, but alas, you must wait for each one to finish before you call the next one.

    If you're using asynchronous programming, you could trigger multiple DB calls at the same time and wait for them to finish.

  • Most web applications are a set of transactions: User makes a request, the app figures out a response and returns it. However, some applications can make a full-blown conversation. For example, a chat app will continuously provide information to the user.

    When working as a sysadmin, we had one interface that had to be restarted every day. The restart required logging in, setting up the shell (old OS), finding the process, and issuing a process restart.

    I eventually wrote a small web interface that used SSH keys to connect to the server via SSH, run all the commands, and eventually restart. I used streamed responses to continuously show the progress as I was making it, instead of only showing the result at the end.

  • Lastly, your application might be doing a lot of work at the end of a request, like logging everything that happened, or the timing of numerous operations during the request for analytical purposes. This would require calculations, comparisons, summarizing, and storing.

    With an asynchronous interface (that supports post-response actions), you could respond to the user and while they continue on their merry way, your app would continue with the logging (and possible cleanups) asynchronously.

Over time, we got used to web applications being transaction-based, so we created queue managers to help deal with it. We provide all work that should be done to the queue manager and it will run it in the background while the application continues.

Queue managers are very useful, but they don't solve every situation, and in some cases, they are just unnecessary overhead.

Asynchronous, non-blocking, streaming, what?

There are multiple definitions floating around. There are differences between them, but for our purposes, they can all be viewed as similar enough to be used interchangeably.

To be a slightly more technical (without the 100 lines of text I have decided to spare you), asynchronous code is an umbrella term. We will write asynchronous code that will work using an event loop and non-blocking calls. Streaming is the term for continuously feeding a stream of information versus a single response.

What event loop will we use?

For this example, we will use the AnyEvent event loop. However, there are other event loops you should also consider, mainly IO::Async.

The example we use here can be equally written in IO::Async just the same. If you have any issues with AnyEvent, we suggest researching IO::Async.

The Plack web server we will use with AnyEvent is Twiggy, but if you're using IO::Async, you can use Net::Async::HTTP::Server.

So what example are we using here?

For our example, we will build a small application that, when called, will reach a Covid-19 API to retrieve the top countries with confirmed cases. Then it will fetch each country's confirmed cases for the last period and create a trimmed mean / truncated mean.

  • Why?

    This small app provides us with an example of making a single request (in our case, an API, but it can be a DB call just the same) and then making multiple concurrent requests (again, through an API, but could be a DB call).

  • Could this be cached daily?

    Theoretically yes. We're not looking at the most optimal code, but just enough contrived code to deliver the message but not be entirely useless.

  • What's a trimmed mean?

    Mean (average) is not a very reliable metric, since it can be easily offset by outliers. You have ten good grades and one really bad one. With mean, you wouldn't look like such a good student. However, if we trimmed outliers, we would be able to see you as a good student with one crappy grade.

    I'm not a statistician, nor am I especially good with math, so if you disagree and have a better function, go ahead and use that.

Where's the code?

The basics

First, our initial code:

package CovidStats;
use Dancer2;

# This is still required on the version of Perl we're using
# but this won't be "experimental" for much longer
use experimental qw< postderef signatures >;

# Some modules we'll be using
use DateTime;
use AnyEvent;
use AnyEvent::HTTP; # http_get
use Statistics::Descriptive::Full;
use URI::Escape qw< uri_escape >;

# A few constants, to keep things flexible
use constant {
    'MAX_COUNTRIES' => 5,
    'MAX_DAYS'      => 7,
    'SUMMARY_URL'   => 'https://covid-api.mmediagroup.fr/v1/cases',
    'COUNTRY_URL'   => 'https://covid-api.mmediagroup.fr/v1/history?country=%s&status=Confirmed',
    'LTRIM'         => '0.25',
};

Now, the routing

We will set up only one route (/) and it will respond in text instead of HTML, just for simpler interaction.

get '/' => sub {
    return delayed {
        flush();
        content("hi!");
        done();
    };
};

First, we return a delayed response. This means our code will be asynchronous. The delayed keyword is required for two things:

  • Create the initial asynchronous response

    This allows Dancer2 to declare to the web server that it is running asynchronous code.

  • Any time we have asynchronous code blocks

    Whenever we have a subroutine that needs access to the Dancer2 DSL, we need to change the sub into a delayed sub.

Using delayed can be done without parenthesis (just like other Dancer2 keywords) and without the sub keyword. You can also use it with parenthesis and the sub keyword, which will allow you to make use of subroutine signatures.

The following are equivalent:

return delayed {...};
return delayed( sub {...} );

We will be using both styles in this example.

The flush keyword will start streaming our information. Each content call will send data. It can be used multiple times and, if we forgot to call flush, it will be called the first time we call content.

The done keyword tells Dancer2 to tell the web server that we're done and it can close the connection with the user. We can run additional code afterward, but we don't have any in our example.

Expanding it to make the first request

What we want now is to make a request to retrieve the top countries with confirmed cases.

get '/' => sub {
    return delayed {
        flush();

        my $cv = AnyEvent->condvar();
        $cv->cb( delayed {
            content("Retrieved countries");
            done();
        });

        $cv->begin();
        http_get SUMMARY_URL(), sub ( $body, $hdrs ) {
            $cv->end();
        };
    };
};

Here we set up a condvar (condition variable) to manage states. The cb with the delayed subroutine block indicate what to do when our following asynchronous code finishes running.

Yes, we declare first what we do when code ends and only then write the code. Welcome to asynchronous code. You might also now understand why Promises is such a popular pattern, which we will see in the next part of this series.

We then provide the async code to run, namely an HTTP request to our API. The subroutine calling $cv->end notes the end of the async code and will trigger the $cv->cb code we set up.

Let's add some data validation

get '/' => sub {
    return delayed {
        flush();

        my $cv = AnyEvent->condvar();
        $cv->cb( delayed { done(); } );

        $cv->begin();
        http_get SUMMARY_URL(), delayed( sub ( $body, $hdrs ) {
            my $data;
            eval {
                $data = from_json($body);
                1;
            } or do {
                content("Sorry, failed to fetch data: $!");
                $cv->end();
                return;
            };

            ...
    };
};

In this case, we added some validation for our JSON response. We also moved to using delayed so we could access content and done keywords. You'll notice we're using delayed with parenthesis and the sub keyword, so we could continue using subroutine signatures.

Filtering and sorting

Our next goal is to pick the top countries based on the most confirmed cases using a simple sort. I won't go into the data structure the API returns because that's the least valuable part here.

In short, we take all of our countries, excluding the "Global" category the API provides, then each country's confirmed key is compared and sorted, eventually picking only the amount we want.

my @countries = ( sort {
    $data->{$b}{'All'}{'confirmed'} <=> $data->{$a}{'All'}{'confirmed'}
} grep $_ ne 'Global', keys $data->%* )[ 0 .. MAX_COUNTRIES() ];

if ( !@countries ) {
    content("Sorry, failed to received countries\n");
    done();
    return;
}

This code will be run within the http_get callback.

Introducing multiple concurrent requests

Once we get this list of top countries, we want to get the results for each country's history. Here is where the real magic happens.

We want to make another http_get call for each country and we want these to run concurrently. This way, no matter how many we have, the time won't really change, since they are happening at the same time.

(On larger-scale applications, you would likely defend against running too many concurrent requests. The API itself might throttle you as well.)

my $yesterday = DateTime->now->subtract( 'days' => 1 );
foreach my $country (@countries) {
    $cv->begin();

    my $data_url = sprintf COUNTRY_URL(), uri_escape($country);
    http_get $data_url, delayed( sub ( $body, $hdrs ) {
        my $country_data;
        eval {
            $country_data = from_json($body);
            1;
        } or do {
            content("Sorry, failed to fetch data for $country: $!");
            $cv->end();
            return;
        };

        my @period;

        my $day = $yesterday;
        foreach ( 0 .. MAX_DAYS() ) {
            push @period, $day->ymd();
            $day = $day->subtract( 'days' => 1 );
        }

        my @period_data = $country_data->{'All'}{'dates'}->@{@period};

        my $stat = Statistics::Descriptive::Full->new();
        $stat->add_data($_)
            for @period_data;

        $country_stat = $stat->trimmed_mean( LTRIM() );
        $cv->end();
    });

}

We start by calling begin for each request we intend to make. When reach request ends, we call end. This allows the condvar to track how many concurrent requests we make and when we finished all of them, to call the finishing callback we created at the beginning.

We create a proper request URL with sprintf and make a request for each country's data. We take the last X amount of days (using our constant MAX_DAYS) and calculate it from yesterday (since the data for today is not yet available until today ends).

We use Statistics::Descriptive::Full to calculate the trimmed mean.

So far, however, we do nothing with this calculation. What we want is to do something when all of it ends, so let's adjust this a bit.

Updating our finishing callback

In the beginning, we set the cb to just send something to the user and close the connection. Instead, we intend to now store information and display it back to the user:

my %country_weekly;
$cv->cb( delayed {
    content( "By country (period of " . MAX_DAYS() . " days):\n" );
    content( "- $_: $country_weekly{$_}\n" )
        for sort { $country_weekly{$b} <=> $country_weekly{$a} }
            keys %country_weekly;

    content("\nThank you for visiting our API\n");
    done();
});

Our code that calculates the trimmed mean can now use this variable:

my $stat = Statistics::Descriptive::Full->new();
$stat->add_data($_)
    for @period_data;

$country_weekly{$country} = $stat->trimmed_mean( LTRIM() );

Full program

The full program is:

package CovidStats;

use Dancer2;
use experimental qw< postderef signatures >;

use DateTime;
use AnyEvent;
use AnyEvent::HTTP;
use Statistics::Descriptive::Full;
use URI::Escape qw< uri_escape >;

use constant {
    'MAX_COUNTRIES' => 5,
    'MAX_DAYS'      => 7,
    'SUMMARY_URL'   => 'https://covid-api.mmediagroup.fr/v1/cases',
    'COUNTRY_URL'   => 'https://covid-api.mmediagroup.fr/v1/history?country=%s&status=Confirmed',
    'LTRIM'         => '0.25',
};

get '/' => sub {
    return delayed {
        flush();

        my $cv = AnyEvent->condvar();

        my %country_weekly;
        $cv->cb( delayed {
            content( "By country (period of " . MAX_DAYS() . " days):\n" );
            content( "- $_: $country_weekly{$_}\n" )
                for sort { $country_weekly{$b} <=> $country_weekly{$a} }
                    keys %country_weekly;

            content("\nThank you for visiting our API\n");
            done();
        });

        $cv->begin();
        http_get SUMMARY_URL(), delayed( sub ( $body, $hdrs ) {
            my $data;
            eval {
                $data = from_json($body);
                1;
            } or do {
                content("Sorry, failed to fetch data: $!");
                $cv->end();
                return;
            };

            my @countries = ( sort {
                $data->{$b}{'All'}{'confirmed'} <=> $data->{$a}{'All'}{'confirmed'}
            } grep $_ ne 'Global', keys $data->%* )[ 0 .. MAX_COUNTRIES() ];

            if (!@countries) {
                content("Sorry, failed to received countries\n");
                $cv->end();
                return;
            }

            my $yesterday = DateTime->now->subtract( 'days' => 1 );
            foreach my $country (@countries) {
                $cv->begin();

                my $data_url = sprintf COUNTRY_URL(), uri_escape($country);
                http_get $data_url, delayed( sub ( $body, $hdrs ) {
                    my $country_data;
                    eval {
                        $country_data = from_json($body);
                        1;
                    } or do {
                        content("Sorry, failed to fetch data for $country: $!");
                        $cv->end();
                        return;
                    };


                    my @period;

                    my $day = $yesterday;
                    foreach ( 0 .. MAX_DAYS() ) {
                        push @period, $day->ymd();
                        $day = $day->subtract( 'days' => 1 );
                    }

                    my @period_data = $country_data->{'All'}{'dates'}->@{@period};

                    my $stat = Statistics::Descriptive::Full->new();
                    $stat->add_data($_)
                        for @period_data;

                    $country_weekly{$country} = $stat->trimmed_mean( LTRIM() );
                    $cv->end();
                });

            }

            $cv->end();
        });
    };
};

1;

Application runner

Our app.psgi file is simple enough:

use CovidStats;
CovidStats->to_app();

Running

We can run this with Twiggy in the following manner:

$ plackup -s Twiggy bin/app.psgi
Twiggy: Accepting connections at http://0.0.0.0:5000/

Normally, plackup is very good at recognizing which server to use. If we didn't specify to use Twiggy, it will still get it right:

$ plackup bin/app.psgi    
Twiggy: Accepting connections at http://0.0.0.0:5000/

Of course, on production you would set up something more elaborate instead of running this on a terminal. I suggest looking at Dancer2::Manual::Deployment for production use.

Testing out our application

On another terminal, we will run the following command:

$ curl localhost:5000
By country (period of 7 days):
- US: 16640229.25
- India: 9690261.25
- Brazil: 6294810
- France: 2118033.5
- Russia: 1854813.5
- United Kingdom: 1110655.5

Thank you for visiting our API

Not bad at all.

Final notes

There is a lot to say here:

  • The example

    This example is fairly contrived. The M-Media-Group API supports retrieving the history for all countries, so this two-step process is unnecessary.

    The calculation we do is not necessarily helpful. You might come up with a better calculation that is more useful and provides more insight.

  • HTML output

    The output in this example is purely text and includes newlines, which is in useful in the terminal, but not for the browser. But hey, contrived example!

    For streaming output, you would want self-contained message packets, like small JSON-structured messages. That way, your clients would be able to read each separately and use it.

Author

This article has been written by Sawyer X for the Perl Dancer Advent Calendar 2020.

Copyright

No copyright retained. Enjoy.

Sawyer X.