Ga naar inhoud

Generic fetching

To make it easier to add data providers in the future there are a couple of helper classes that should be implemented by the class responsible for fetching the data of the provider (from now on called fetcher).

FetchableMixin and FetchableColumn

The FetchableMixin and FetchableColumn classes can be found in app/database/models/mixins/fetchable.py.

FetchableColumn

Represents a property that can be fetched from an API. This class is basically an alias for sqlalchemy.orm.Mapped but enables us to make a distinction between property that we added ourselves and properties that are fetched from an API.

FetchableMixin

Any database model that represents an API entity, should extend from this mixin. It adds the following functionality:

  • Adds SyncedAt, FirstSyncAt and DeletedAt columns to a class;
  • Allows for a custom implementation of the incremental function;
  • A covering index is created to optimize the query that counts the active records in the database.

The FirstSyncAt column is used for partitioning in Power BI and must never change.

Incremental fetching

Optionally, fetchers can make use of the IncrementalFetchProcess class located in app/integrations/incremental_fetch_process.

Temporary table

After a fetcher has fetched the data it should write the data via a temporary table. The TemporaryTable class located in app/database/temporary_table.py can be used for this purpose.

The temporary class uses two methods to write new and updated records to the database.

Direct update

For small datasets (<= 20 rows at the time of writing) we insert and update data directly into the database table. It works like this

  • Extract existing records based on PK's of the data;
  • Loop through all (new/updated/unchanged) API records;
    • Find the corresponding existing DB record;
    • If there is no existing record, insert it;
    • If there is an existing record, check if for updates and if there are any, commit them;

Since this methods does everything in memory and our application generally runs on small containers, this should not be used for larger datasets.

Writing and merging via temp table

For larger datasets we make use of two temporary tables, one has the exact same schema definition as the 'main' database table and the other has columns for all primary keys and FirstSyncAt.

Writing

There are two methods to write large datasets to the temporary table

  • Using SQLAlchemy bulk insert;
    • Make sure to enable fast_executemany on the engine;
    • Don't use it for very large datasets because this method can only 2100 values per batch (this is maximum number of parameters SQL Server accepts), for an entity with 70 columns this means we insert 30 rows at a time.
  • Using BULK INSERT via a csv in a blob storage
    • Write the dataset to a csv using Pandas;
    • Upload the csv to a Blob container in an Azure Storage Account;
    • Define the csv in the database with the CREATE EXTERNAL DATA SOURCE ... command;
    • Insert the data with the BULK INSERT [tablename] FROM [filename] WITH [DATA SOURCE] command.

Merging

The queries in this document are only for demo purposes and probably not valid SQL.

There are three tables involved in the merge process:

  • The actual database table: main
  • The temporary database table: temp
  • The FirstSyncAt database table: fs

A recurring subquery to define updated records, UPDATES, is defined as follows:

SELECT * FROM temp
    LEFT JOIN main ON main.pk1 = temp.pk1 [AND main.pk2 = temp.pk2]
    -- Option 1, use updated_at column if present (can also be named Modified, LastChange, etc.)
    WHERE main.pk1 IS NOT NULL AND temp.updated_at > main.updated_at
    -- Option 2, compare all columns
    WHERE main.pk1 IS NOT NULL AND main.col1 != temp.col1 AND main.col2 != temp.col2 AND ...

Merge steps:

  • Insert FirstSyncAt values into fs:

    INSERT INTO fs (pk1, [pk2], FirstSyncAt)
    FROM (
        SELECT pk1, [pk2], FirstSyncAt FROM `UPDATES`
    )
    

  • Remove all updates from main:

    DELETE FROM main WHERE pk1 IN (SELECT pk1 FROM `UPDATES`)
    

  • Insert new and updates records in main:

    INSERT INTO main (col1, col2, col3, ..., FirstSyncAt)
    FROM (
        SELECT col1, col2, col3, ..., COALESCE(fs.FirstSyncAt, temp.FirstSyncAt)
        FROM temp
            LEFT JOIN fs ON temp.pk1 = fs.pk1 [AND temp.pk2 = fs.pk2]
            LEFT JOIN main ON main.pk1 = temp.pk1 [AND main.pk2 = temp.pk2]
        WHERE main.pk1 IS NULL
    )
    

Best practices

Since our application generally runs on containers with not a lot of RAM it is recommended that fetched data is written to the database in batches. This way we do not need to load all data in memory before writing it to the database.