Creating a site preview like in slack (using aiohttp)

, , python, aiohttp, asyncio, slack

In this article we will write a small library for creating a preview for a site. The description of how the preview works in slack you can check here.

4 Data sources will be used:

  1. oEmbed
  2. Twitter Cards
  3. Open Graph
  4. HTML meta tags

In this exact order we will try to retrieve the data.

Technology stack

For parsing we will use:

Beautiful Soup - to navigate and search through the tree of an HTML document.

html5lib - to parse HTML documents, most correctly works with an invalid HTML markup.

aiohttp - an asynchronous client for receiving web pages + an asynchronous server.

oEmbed

oEmbed is an open format designed to simplify the embedding of the contents of one web page to another. In a role of the content there may be photos, videos, links or other types of data. You can find specification and description of the format here.

The basic idea of how this format works: a site provides specific endpoint on which you can make a GET request. This request allows passing the URL of the page you want to extract data for, in the parameters. Here’s the example:

A popular social network for sharing and adding ratings to photos and short videos Instagram supports oEmbed.

Endpoint for receiving data: https://api.instagram.com/oembed

Page example: https://www.instagram.com/p/BOcL9FQFKAU/

The final URL for receiving information about a page (url has to be encoded): https://api.instagram.com/oembed?url=https%3A%2F%2Fwww.instagram.com%2Fp%2FBOcL9FQFKAU%2F

The main problem is that we need to know if the site supports the oEmbed format or not. A full list of providers does not actually exist, so we need to add those providers we know. For this, I decided to use a separate JSON file to which we will add providers.

The format is as follows (borrowed from the specification site, a small list can be found here):

[
    {
        "provider_name": "Instagram",
        "provider_url": "https://www.instagram.com",
        "endpoints": [
            {
                "schemes": [
                    "https://www.instagram.com/p/*"
                ],
                "url": "https://api.instagram.com/oembed",
                "formats": [
                    "json"
                ]
            }
        ]
    }
]

If the reference falls under one of the masks mentioned in the schemes, the endpoint specified in the url will be used.

Let’s write support functions to download a list of providers

OEMBED_PROVIDERS_ENV_VAR_NAME = 'OEMBED_PROVIDERS_FILE'


class LoadOembedProvidersException(BaseAiounfurlException):
    pass


def schema_mask_to_re(url_schema):
    """
    The function to convert the URL mask into regexp object
    """
    schema_re = r'.'.join(map(re.escape, url_schema.split('')))
    return re.compile(r'^{0}$'.format(schema_re))


def load_providers():
    """
    Downloading a file from the file specified in OEMBED_PROVIDERS_FILE environment variable
    """
    providers_filepath = os.getenv(OEMBED_PROVIDERS_ENV_VAR_NAME)
    if not providers_filepath:
        return []
    try:
        providers_file = open(providers_filepath)
    except IOError as e:
        msg = "Error loading Oembed providers, I/O error ({0}): {1}"
        raise LoadOembedProvidersException(msg.format(e.errno, e.strerror))
    else:
        providers = json.load(providers_file)
        providers_file.close()
    return providers


def prepare_providers(providers):
    """
    Processing of the uploaded file, URL masks converting
    """
    result = []
    for provider in providers:
        endpoints = provider.get('endpoints', [])
        for index, endpoint in enumerate(endpoints):
            schemes = endpoint.get('schemes')
            endpoint_url = endpoint.get('url')
            if schemes and endpoint_url:
                result.append({
                    'provider_name': provider['provider_name'],
                    'schemes': [schema_mask_to_re(s) for s in schemes],
                    'url': endpoint_url})
    return result

Then the parser itself:

from collections import OrderedDict
from urllib.parse import urlsplit, urlunsplit, urlencode, parse_qsl


class OEmbedURLExtractor(object):

    MIME_TYPES = [
        'application/json+oembed',
        'text/xml+oembed']

    def init(self, providers, params=None):
        # we transmit the downloaded providers and settings (maxheight, minheight)
        self._providers = providers
        self._params = params

    def _build_oembed_url(self, url, provider, data_format):
        """
        An internal method for the link formation to the oEmbed endpoint
        """
        endpoint_url = provider['url'].replace('{format}', data_format)
        scheme, netloc, path, qs, fragment = urlsplit(endpoint_url)
        query_params = OrderedDict(parse_qsl(qs))
        query_params['url'] = url
        query_params['format'] = data_format
        if self._params:
            query_params.update(self._params)
        query_params = urlencode(query_params, True)
        return urlunsplit((scheme, netloc, path, query_params, fragment))

    def get_oembed_url(self, url, data_format='json'):
        """
        Getting the Oembed link from the providers that we know (that were downloaded from the file)
        """
        for provider in self._providers:
            for schema_re in provider['schemes']:
                if schema_re.match(url):
                    return self._build_oembed_url(url, provider, data_format)

    def get_oembed_url_from_html(self, soup):
        """
        oEmbed providers can choose to make their oEmbed support discoverable
        by adding elements to the head of their existing (X)HTML documents.
        """
        for mime_type in self.MIME_TYPES:
            link = soup.find('link', type=mime_type, href=True)
            if link:
                return link['href']

Example of using:

providers = prepare_providers(load_providers())
exractor = OEmbedURLExtractor(providers)
oembed_oembed_url = extractor.get_oembed_url('https://www.instagram.com/p/BOcL9FQFKAU/')

if the provider has been added to the file, we'll get a link to the oembed endpoint from which we will be able to receive data. The get_oembed_url_from_html method will be described below.

Open Graph

Open Graph protocol is a special set of meta tags, which are integrated into the page html-code, the format was created in Facebook. Description of the protocol can be found here. Open graph data is embedded directly into the page and has a simple structure, so it is quite simple to remove it.

Open Graph parser code:

def _add_to_result(result, property_name_parts, content):
    if len(property_name_parts) > 1:
        return result
    property_name = property_name_parts[0]
    if property_name in result.keys():
        if isinstance(result[property_name], list):
            result[property_name].append(content)
        else:
            result[property_name] = [result[property_name], content]
    else:
        result[property_name] = content
    return result


def extract_from_html(soup):
    """
    Beautiful Soup object is transmitted directly to the function input
    """
    # search all meta tags
    meta = soup.findAll('meta')
    result = {}
    for tag in meta:
        if tag.has_attr('property') and 'og:' in tag['property']:
            tag_property = tag['property']
            property_name_parts = tag_property.replace('og:', '', 1).split(':')
            result = _add_to_result(result, property_name_parts, tag['content'])
    return result

This parser supports retrieval of data from the first level of Open Graph tags and supports arrays, but does not support nested structures, i.e. it can extract data from these tags:

<meta property="og:title" content="The Rock" />
<meta property="og:type" content="video.movie" />
<meta property="og:url" content="http://www.imdb.com/title/tt0117500/&quot; />
<meta property="og:image" content="http://example.com/rock.jpg&quot; />
<meta property="og:image" content="http://example.com/rock2.jpg&quot; />

but not from the nested ones:

<meta property="og:image" content="http://example.com/ogp.jpg&quot; />
<meta property="og:image:secure_url" content="https://secure.example.com/ogp.jpg&quot; />
<meta property="og:image:type" content="image/jpeg" />
<meta property="og:image:width" content="400" />
<meta property="og:image:height" content="300" />

Twitter Cards

This format is very similar to the Open Graph, special meta tags are added into the page HTML that contain information about the page. Tags descriptionTags description.

Parser code:

def extract_from_html(soup):
    meta = soup.findAll('meta')
    result = {}
    for tag in meta:
        if tag.has_attr('name') and 'twitter:' in tag['name']:
            tag_name = tag['name']
            property_name = tagname.replace('twitter:', '', 1).replace(':', '')
            result[property_name] = tag['content']
    return result

We retrieve all the data from the meta tags with the name attribute, which starts with twitter:

Meta tags

Parser:

# The list of meta tags which we will extract data from
# Partially taken from here - https://gist.github.com/kevinSuttle/1997924
META_TAGS_NAMES = [
    'keywords',
    'description',
    'subject',
    'copyright',
    'language',
    'Classification',
    'author',
    'pagename',
    'subtitle',
    'date',
    'syndication-source',
    'original-source']


def _get_title(soup):
    """
    Extracting the title of the page
    We take the text of the title tag or the first h1 tag
    """
    if (soup.title and soup.title.text != ''):
        return soup.title.text
    if (soup.h1 and soup.h1.text != ''):
        return soup.h1.text
    return None


def _get_description(soup):
    """
    Description of the page, the text following the h1 tag
    or just the first p tag
    """
    first_h1 = soup.find('h1')
    if first_h1:
        first_p = first_h1.find_next('p')
        if (first_p and first_p.string != ''):
            return first_p.text
    first_p = soup.find('p')
    if (first_p and first_p.string != ''):
        return first_p.string
    return None


def _get_image(soup):
    """
    Page image which is the nearest image following the h1
    """
    first_h1 = soup.find('h1')
    if first_h1:
        first_image = first_h1.find_next_sibling('img')
        if first_image and first_image['src'] != '':
            return first_image['src']
    return None


def extract_from_html(soup):
    """
    The main function of extracting data from the meta tags
    """
    meta = soup.findAll('meta')
    result = {}
    for tag in meta:
        attrs = tag.attrs
        has_required_attrs = 'name' in attrs and 'content' in attrs
        if has_required_attrs and attrs['name'] in META_TAGS_NAMES:
            result[attrs['name']] = attrs['content']
    result['title'] = _get_title(soup)
    if not result.get('description'):
        result['description'] = _get_description(soup)
    result['image'] = _get_image(soup)
    return {k: v for k, v in result.items() if v}

Retrieving data

We will receive data by means of an asynchronous HTTP client aiohttp.

The main two functions are:

  • fetch_all - getting all data from all sources
  • get_preview_data - getting basic information title, description, image to create a preview

Both functions take the same parameters (only the first two parameters are mandatory):

  • session - aiohttp.ClientSession
  • url - URL, which we want to get the data for
  • loop - asyncio event loop object
  • oembed_providers - a list of oEmbed providers
  • oembed_params - parameters for the oEmbed query (maxwidth and maxheight)

Regarding the session object transfer, this is done in order to create a single session on the application, from the aiohttp documentation:

Don’t create a session per request. Most likely you need a session per application which performs all requests altogether. A session contains a connection pool inside, connection reusage and keep-alives (both are on by default) may speed up total performance.

import asyncio
import aiohttp
from bs4 import BeautifulSoup
from aiounfurl import exceptions
from aiounfurl.parsers import oembed, open_graph, meta_tags, twitter_cards


OK_STATUS_CODE = 200


async def _fetch(session, url, read_response_func='text'):
    async with session.get(url) as resp:
        if resp.status != OK_STATUS_CODE:
            msg = 'Error getting data for url {0}, status_code: {1}'.format(
                url, resp.status)
            raise exceptions.ResourceErrorResponse(msg)
        return await getattr(resp, read_response_func)()


async def _fetch_data(session, url, oembed_url_extractor=None):
    result = {}
    # we get the page HTML
    try:
        html = await _fetch(session, url)
    except aiohttp.errors.ClientError as exc:
        msg = 'Error getting page {0}, exception: {1}'.format(
            url, str(exc))
        raise exceptions.FetchPageException(msg)
    # we parse HTML
    soup = BeautifulSoup(html, 'html5lib')
    # If we are not able to determine an oEmbed provider, then try to retrieve
    # Link to oEmbed from HTML (more information can be found here http://oembed.com/#section4)
    if oembed_url_extractor:
        oembed_url = oembed_url_extractor.get_oembed_url_from_html(soup)
        if oembed_url:
            result['oembed'] = await _fetch(
                session, oembed_url, read_response_func='json')
    # We retrieve data from HTML using parsers
    result['open_graph'] = open_graph.extract_from_html(soup)
    result['twitter_cards'] = twitter_cards.extract_from_html(soup)
    result['meta_tags'] = meta_tags.extract_from_html(soup)
    return result


async def fetch_all(session, url, loop=None, oembed_providers=None,
                    oembed_params=None):
    # we try to get a link to the oEmbed endpoint from the providers we know
    oembed_url_extractor = oembed.OEmbedURLExtractor(
        oembed_providers or [], params=oembed_params)
    oembed_url = oembed_url_extractor.get_oembed_url(url)

    result = {}
    if oembed_url:
        # If we are able to get a link to oembed, then
        # we do two parallel queries at the same time
        # the first One - to oEmbed, the second one - to the page itself
        tasks = [
            _fetch(session, oembed_url, read_response_func='json'),
            _fetch_data(session, url)]
        oembed_result, other_results = await asyncio.gather(
            *tasks, loop=loop, return_exceptions=True)
        raise_exceptions = (
            exceptions.ResourceErrorResponse,
            exceptions.FetchPageException)
        # If the page is not available or the server returns an error,
        # we throw an exception
        if isinstance(other_results, raise_exceptions):
            raise other_results
        # we check what oEmbed endpoint returned and depending on the error
        # record the data or error information into the result
        if isinstance(oembed_result, dict):
            result['oembed'] = oembed_result
        elif isinstance(oembed_result, raise_exceptions):
            result['oembed'] = {
                'error': str(oembed_result)}
        result.update(other_results)
    else:
        # If we do not define the oembed provider, we just get the page
        other_results = await _fetch_data(session, url, oembed_url_extractor)
        result.update(other_results)
    return result


async def get_preview_data(session, url, loop=None, oembed_providers=None,
                           oembed_params=None):
    data = await fetch_all(
        session,
        url,
        loop=loop,
        oembed_providers=oembed_providers,
        oembed_params=oembed_params)
    result = {'title': None, 'description': None, 'image': None}
    sources = ['oembed', 'open_graph', 'twitter_cards', 'meta_tags']
    for field in ['title', 'description']:
        for source in sources:
            result[field] = data.get(source, {}).get(field)
            if result[field]:
                break
        result[field] = result[field] or None

    # oembed image
    if data.get('oembed'):
        if data['oembed']['type'] == 'photo':
            result['image'] = data['oembed'].get('url')
        elif data['oembed'].get('thumbnail_url'):
            result['image'] = data['oembed']['thumbnail_url']

    # open graph image
    if not result['image'] and data.get('open_graph'):
        image = data['open_graph'].get('image')
        if image and isinstance(image, list):
            result['image'] = image[0]
        elif image:
            result['image'] = image

    # twitter cards image
    if not result['image'] and data.get('twitter_cards'):
        result['image'] = data['twitter_cards'].get('image')

    # from meta tags
    if not result['image'] and data.get('meta_tags'):
        result['image'] = data['meta_tags'].get('image') or None
    return result

Example of using

Use as a library in your application.

For the validation of the data I use a small library marshmallow.

srv.py

import asyncio
import aiohttp
from marshmallow import Schema, fields, ValidationError
from aiohttp.web import Application, json_response, run_app, Response
from aiounfurl import exceptions
from aiounfurl.parsers.oembed import providers_helpers
from aiounfurl.views import get_preview_data, fetch_all


def _validate_resolution(value):
    """
    maxheight and maxwidth validation
    """
    if value < 1:
        raise ValidationError('Value must be greater than 0.')
    if value > 3000:
        raise ValidationError('Value must not be lesser than 3000.')


# Schema to validate the request parameters
class RequestParamsSchema(Schema):
    url = fields.Url(required=True)
    maxwidth = fields.Integer(required=False, validate=_validate_resolution)
    maxheight = fields.Integer(required=False, validate=_validate_resolution)


async def _base_view(request, func):
    params, errors = RequestParamsSchema().load(request.rel_url.query)
    # If the data is invalid, we return an error
    if errors:
        return json_response(errors, status=400)
    app = request.app
    # we create a session for the HTTP client
    async with aiohttp.ClientSession(loop=app.loop) as session:
        oembed_params = {k: v for k, v in params.items() if k != 'url'}
        # we call fetch_all or get_preview_data
        try:
            result = await func(
                session,
                params['url'],
                loop=app.loop,
                oembed_providers=app['providers'],
                oembed_params=oembed_params)
        except exceptions.BaseAiounfurlException as exc:
            # If something goes wrong, we return an error
            return json_response({'error': str(exc)}, status=400)
        # we return the result
        return json_response(result)


async def extract(request):
    return await _base_view(request, fetch_all)


async def preview(request):
    return await _base_view(request, get_preview_data)


async def init(loop):
    app = Application(loop=loop)
    app.router.add_get('/extract', extract)
    app.router.add_get('/preview', preview)
    return app


# we get EventLoop
loop = asyncio.get_event_loop()
# Initialization of our application
app = loop.run_until_complete(init(loop))
# providers loading
app['providers'] = providers_helpers.prepare_providers(
    providers_helpers.load_providers())
# Running the application
run_app(app)

requirements.txt

aiounfurl==0.1
marshmallow==2.10.4

Run python srv.py, then you can make a GET request to the address http://127.0.0.1:8080/extract?url=your_link to get all site data or http://127.0.0.1:8080/preview?url=your_link to get the data which is necessary for the preview.

I also added an example in aiounfurl example repository with GUI, examples: preview extract preview extract

Running the example in Docker.

I added a docker image with the example in http://hub.docker.com/ to run the sample as a separate independent service.

Running in the background:

docker run --name aiounfurl -p 8080:8080 -d tigorc/aiounfurl

then you can open our example http://127.0.0.1:8080/.

Using the list of oEmbed providers (a json file with a list of providers /path_to_file/providers.json has to be preliminarily created):

docker run --name aiounfurl -p 8080:8080 -e "OEMBED_PROVIDERS_FILE=/srv/app/providers.json" -v /path_to_file/providers.json:/srv/app/providers.json -d tigorc/aiounfurl

The application on github.

contact us right now