6 posts tagged

Data engineering

 No comments    7   1 d   Data engineering   sql

Collecting Data on Ad Campaigns from VK.com

We have a lot to share in today’s longread: we’ll retrieve data on ad campaigns from Vkontakte (widely popular social network in Russia and CIS countries) and compare them to Google Analytics data in Redash. This time we don’t need to create a server, as our data will be transferred to Google Docs via Google Sheets API.

Getting an Access Token
We need to create an app to receive our access token. Follow this link https://vk.com/apps?act=manage and click “Create app” on the developer’s page. Choose a name for your app and check it as a “Standalone app”. Then, click Settings in the left menu and save your app ID.

More details on access tokens can be found here: Getting an access token

Copy this link:

https://oauth.vk.com/authorize?client_id=YourClientID&scope=ads&response_type=token

And change YourClientID to your app ID, this will allow you to get information about your advertising account. Open this link in your browser and you will be redirected to another page, which URL address holds your generated access token.

Access token expires in 86400 seconds or 24 hours. If you want to generate a token with an unlimited lifetime period, just pass scope to the offline parameter. In case if you need to generate a new token – change your password account or terminate all active sessions in security settings.

You will also need your advertising account ID to make API requests. It can be found via this link, just copy it:  https://vk.com/ads?act=settings

Using APIs to collect data
Let’s write a script that would allow us to retrieve information on all user’s ad campaigns: number of impressions, сlicks and costs. The script will pass this data to a DataFrame and send it to Google Docs.

from oauth2client.service_account import ServiceAccountCredentials
from pandas import DataFrame
import requests
import gspread
import time

We have several constant variables: access token, advertising account ID and Vkontakte API Version. Here we are using the most recent API version, which is 5.103.

token = 'fa258683fd418fafcab1fb1d41da4ec6cc62f60e152a63140c130a730829b1e0bc'
version = 5.103
id_rk = 123456789

To get advertising stats you need to use the  ads.getStatistics method and pass your ad campaign ID to it. Since we don’t run any advertisements yet, we’ll use the  ads.getAds method that returns IDs of ads and campaigns.

Learn more about the API methods available for Vkontakte here

Use the requests library to send a request and convert the response to JSON.


campaign_ids = []
ads_ids = []
r = requests.get('https://api.vk.com/method/ads.getAds', params={
    'access_token': token,
    'v': version,
    'account_id': id_rk
})
data = r.json()['response']

We have a familiar list of dictionaries returned, similar to the one we have reviewed in the previous article, “Analysing data on Facebook Ad Campaigns with Redash”.

Fill in the ad_campaign_dict dictionary as follows: specify ad ID as a key, and campaign ID as a value, where this ad belongs to.

ad_campaign_dict = {}
for i in range(len(data)):
    ad_campaign_dict[data[i]['id']] = data[i]['campaign_id']

Having ID for every ad needed we can invoke the  ads.getStatistics method to collect data on the number of impressions, clicks, costs, and dates for a particular ad, so create several empty lists in advance.

ads_campaign_list = []
ads_id_list = []
ads_impressions_list = []
ads_clicks_list = []
ads_spent_list = []
ads_day_start_list = []
ads_day_end_list = []

We need to invoke the getStatistics method for each ad separately, let’s refer to the ad_campaign_dict and iterate our requests. Retrieve all-time data by calling the ‘period’ method with the  ‘overall’ value. Some ads may not have impression or clicks if they haven’t been launched yet, this may cause a  KeyError. Let’s recall to the try — except approach to handle this error.

for ad_id in ad_campaign_dict:
        r = requests.get('https://api.vk.com/method/ads.getStatistics', params={
            'access_token': token,
            'v': version,
            'account_id': id_rk,
            'ids_type': 'ad',
            'ids': ad_id,
            'period': 'overall',
            'date_from': '0',
            'date_to': '0'
        })
        try:
            data_stats = r.json()['response']
            for i in range(len(data_stats)):
                for j in range(len(data_stats[i]['stats'])):
                    ads_impressions_list.append(data_stats[i]['stats'][j]['impressions'])
                    ads_clicks_list.append(data_stats[i]['stats'][j]['clicks'])
                    ads_spent_list.append(data_stats[i]['stats'][j]['spent'])
                    ads_day_start_list.append(data_stats[i]['stats'][j]['day_from'])
                    ads_day_end_list.append(data_stats[i]['stats'][j]['day_to'])
                    ads_id_list.append(data_stats[i]['id'])
                    ads_campaign_list.append(ad_campaign_dict[ad_id])
        except KeyError:
            continue

Now, create a DataFrame and print out the first 5 data points

df = DataFrame()
df['campaign_id'] = ads_campaign_list
df['ad_id'] = ads_id_list
df['impressions'] = ads_impressions_list
df['clicks'] = ads_clicks_list
df['spent'] = ads_spent_list
df['day_start'] = ads_day_start_list
df['day_end'] = ads_day_end_list
print(df.head())

Exporting Data to Google Docs
We’ll need a Google API access token, navigate to https://console.developers.google.com and create one. Choose any name you like, then go to your Dashboard and click “Enable APIs and Services”. Choose Google Drive API from the list, enable it and do exactly the same for Google Sheets API.

After activation you will be redirected to the API control panel. Click Credentials – Create Credentials, click choose data type and create an account. Choosing a role is optional, just proceed and specify JSON as a key type.

After these steps you can download a JSON file with your credentials, we’ll rename it to «credentials.json». On the main page you’ll find the email field – copy your email address.

Go to https://docs.google.com/spreadsheets and create a new file named data, we’ll pass data from our DataFrame to it. Put the  credentials.json file in one directory with the script and continue coding. Add these links to the scope list:

scope = ['https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive']

We will use the  ServiceAccountCredentials.from_json_keyfile_name and  gspread.authorize methods available in the  oauth2client and  gspread libraries for authenticaion process. Specify your file name and the scope variable in the ServiceAccountCredentials.from_json_keyfile_name method. The  sheet variable will allow us to send requests to our file in Google Docs.

creds = ServiceAccountCredentials.from_json_keyfile_name('credentials.json', scope)
client = gspread.authorize(creds)
sheet = client.open('data').sheet1

Apply the update_cell method to enter new value in a table cell. It’s worth mentioning that the indexing starts at 0, not 1. With the first loop we’ll move the column names of our DataFrame. And with the following loops we’ll move the rest of our data points. The default limits allow us to make 100 loops for 100 seconds. These restrictions may cause errors and stop our script, that’s why we need to use time.sleep and make the script sleep for 1 second after each loop.

count_of_rows = len(df)
count_of_columns = len(df.columns)
for i in range(count_of_columns):
    sheet.update_cell(1, i + 1, list(df.columns)[i])
for i in range(1, count_of_rows + 1):
    for j in range(count_of_columns):
        sheet.update_cell(i + 1, j + 1, str(df.iloc[i, j]))
        time.sleep(1)

In case of success, you’ll get the same table:

Exporting data to Redash

See how you can connect Google Analytics to Redash in this article «How to connect Google Analytics to Redash?».

Having a table with Google Analytics and ad campaigns from Vkontakte exported we can compare them by writing the following query:

SELECT
    query_50.day_start,
    CASE WHEN ga_source LIKE '%vk%' THEN 'vk.com' END AS source,
    query_50.spent,
    query_50.impressions,
    query_50.clicks,
    SUM(query_49.ga_sessions) AS sessions,
    SUM(query_49.ga_newUsers) AS users
FROM query_49
JOIN query_50
ON query_49.ga_date = query_50.day_start
WHERE query_49.ga_source LIKE '%vk%' AND DATE(query_49.ga_date) BETWEEN '2020-05-16' AND '2020-05-20'
GROUP BY query_49.ga_date, source

ga_source — the traffic source, from which a user was redirected. Use the  CASE method to combine everything that contains “vk” in one column called «vk.com». With the help of JOIN operator we can add the table with the data on ad campaigns, merging by date. Let’s take the day of the last ad campaign and a couple of days after, this will result in the following output:

Takeaways
Now we have a table that reflects how much were spent in ad costs on a certain day, the number of users who viewed this ad, were engaged and redirected to our website, and then completed the sign-up process.

 No comments    23   9 d   BI-tools   data analytics   Data engineering   longread

Working with Materialized Views in Clickhouse

This time we’ll illustrate how you can pass data on Facebook ad campaigns to Clickhouse tables with Python and implement Materialized Views. What is materialized views, you may ask. Oftentimes Clickhouse is used to handle large amounts of data and the time spent waiting for a response from a table with raw data is constantly increasing. Usually, we would use ETL-process to address this task efficiently or create aggregate tables, which are not that useful because we have to regularly update them. Clickhouse system offers a new way to meet the challenge using materialized views.
Materialized Views allow us to store and update data on a hard drive in line with the SELECT query that was used to get a view. When we need to insert data into a table, the SELECT method transforms our data and populates a materialized view.

Setting Up Amazon EC2 instance
We need to connect our Python script that we created in this article to Cickhouse. The script will make queries, so let’s open several ports. In your AWS Dashboard go to Network & Security — Security Groups. Our instance belongs to the launch-wizard-1 group. Сlick it and pay attention to the Inbound rules, you need to set them as shown in this screenshot:

Setting up Clickhouse
It’s time to set up Clickhouse. Let’s edit the config.xml file using nano text editor:

cd /etc/clickhouse-server
sudo nano config.xml

Learn more about the shortcuts here if you didn’t get how to exit nano too :)

Uncomment this line:

<listen_host>0.0.0.0</listen_host>

to access your database from any IP-address:

Create a table and its materialized view
Open a terminal window to create our database with tables:

CREATE DATABASE db1
USE db1

We’ll refer to the same example of data collection from Facebook. The data on Ad Campaigns may often change and be updated, with this in mind we want to create a materialized view that would automatically update aggregate tables containing the costs data. Our Clickhouse table will look almost the same as the DataFrame used in the previous post. We picked ReplacingMergeTree as an engine for our table, it will remove duplicates by sorting key:

CREATE TABLE facebook_insights(
	campaign_id UInt64,
	clicks UInt32,
	spend Float32,
	impressions UInt32,
	date_start Date,
	date_stop	 Date,
	sign Int8
) ENGINE = ReplacingMergeTree
ORDER BY (date_start, date_stop)

And then, create a materialized view:

CREATE MATERIALIZED VIEW fb_aggregated
ENGINE = SummingMergeTree()
ORDER BY date_start
	AS
	SELECT campaign_id,
		      date_start,
		      sum(spend * sign) as spent,
		      sum(impressions * sign) as impressions,
		      sum(clicks * sign) as clicks
	FROM facebook_insights
	GROUP BY date_start, campaign_id

More details are available in the Clickhouse blog.

Unfortunately for us, Clikhouse system doesn’t include a familiar UPDATE method. So we need to find a workaround. Thanks to the Yandex team, these guys offered to insert rows with a negative sign first, and then use sign for reversing. According to this principle, the old data will be ignored when summing.

Script
Let’s start writing the script and import a new library, which is called clickhouse_driver. It allows to make queries to Clickhouse in Python:

We are using the updated version of the script from “Collecting Data on Facebook Ad Campaigns”. But it will work fine if you just combine this code with the previous one.

from datetime import datetime, timedelta
from clickhouse_driver import Client
from clickhouse_driver import errors

An object of the Clientclass enables us to make queries with the execute() method. Type in your public DNS in the host field, port – 9000, specify default as a user, and a databasefor the connection.

client = Client(host='ec1-2-34-56-78.us-east-2.compute.amazonaws.com', user='default', password=' ', port='9000', database='db1')

To ensure that everything works as expected, we need to write the following query that will print out names of all databases stored on the server:

client.execute('SHOW DATABASES')

In case of success the query will return this list:

[('_temporary_and_external_tables',), ('db1',), ('default',), ('system',)]

For example, we want to get data for the past three days. Create several datetime objects with the datetime library and convert them to strings using thestrftime()
method:

date_start = datetime.now() - timedelta(days=3)
date_end = datetime.now() - timedelta(days=1)
date_start_str = date_start.strftime("%Y-%m-%d")
date_end_str = date_end.strftime("%Y-%m-%d")

This query returns all table columns for a certain period:

SQL_select = f"select campaign_id, clicks, spend, impressions, date_start, date_stop, sign from facebook_insights where date_start > '{date_start_str}' AND date_start < '{date_end_str}'"

Make a query and pass the data to the old_data_list. And then, replace their signfor -1 and append elements to the new_data_list:

new_data_list = []
old_data_list = []
old_data_list = client.execute(SQL_select)

for elem in old_data_list:
    elem = list(elem)
    elem[len(elem) - 1] = -1
    new_data_list.append(elem)

Finally, write our algorithm: insert the data with the sign =-1, optimize it with ReplacingMergeTree, remove duplicates, and INSERT new data with the sign = 1.

SQL_query = 'INSERT INTO facebook_insights VALUES'
client.execute(SQL_query, new_data_list)
SQL_optimize = "OPTIMIZE TABLE facebook_insights"
client.execute(SQL_optimize)
for i in range(len(insight_campaign_id_list)):
    client.execute(SQL_query, [[insight_campaign_id_list[i],
                                insight_clicks_list[i],
                                insight_spend_list[i],
                                insight_impressions_list[i],
                                datetime.strptime(insight_date_start_list[i], '%Y-%m-%d').date(),
                                datetime.strptime(insight_date_start_list[i], '%Y-%m-%d').date(),
                                1]])
    client.execute(SQL_optimize)

Get back to Clickhouse and make the next query to view the first 20 rows:
SELECT * FROM facebook_insights LIMIT 20

And  SELECT * FROM fb_aggregated LIMIT 20 to compare our materialized view:

Nice work! Now we have a materialized view that will be updated each time when the data in the facebook_insights table changes. The trick with the sign operator allows to differ already processed data and prevent its summation, while ReplacingMergeTree engine helps us to remove duplicates.

 No comments    54   14 d   Amazon Web Services   AWS   clickhouse   data analytics   Data engineering   python

Clickhouse as a consumer for Amazon MSK

Disclaimer: the note is of a technical nature, therefore it might be interesting to fewer people with business background.

This blog hasn’t addressed the topic of Clickhouse yet, however it’s one of the fastest databases from Yandex company. Brief account without going into details: Clickhouse – is the most efficiently written DBMS of a column type with respect to program code, information about the DBMS is quite thoroughly described in the documentation and in multiple videos on Youtube (one, two, three).

Over the last four years, I’ve been using Clickhouse in my practice as an analyst and expert in building analytical reporting. Mostly, I’ve been using Redash for solution of tasks on reporting visualization / reports with parameters / dashboards, as it is the most convenient interface for access to Clickhouse data.
However, just recently, in Looker, that I spoke about previously, an opportunity to connect Clickhouse as a data source appeared. It’s worth noting, that in Tableau connection to Clickhouse has existed for quite a while.

The architecture of the analytical service, based on Clickhouse, is predominantly cloud one. That’s how it was in the task reviewed. Let’s assume you have an allocated instance EC2 in Amazon (on which you’ve installed Clickhouse) and a separate Kafka-cluster (solution of Amazon MSK).

The task: is to connect Clickhouse as a  consumer in order to obtain information from brokers of your Kafka cluster. In fact, it’s quite thoroughly described how exactly one can connect to Kafka cluster in documentation on the site of Amazon MSK, so I won’t repeat this information. In my case, the guide helped: the topics were created by a producer from machine with installed Clickhouse, and were read from it by a consumer.

However, a problem arose: at connection of Clickhouse to Kafka as a consumer, the following error occurred:

020.02.02 18:01:56.209132 [ 46 ] {e7124cd5-2144-4b1d-bd49-8a410cdbd607} <Error> executeQuery: std::exception. Code: 1001, type: cppkafka::HandleException, e.what() = Local: Timed out, version = 20.1.2.4 (official build) (from 127.0.0.1:46586) (in query: SELECT * FROM events), Stack trace (when copying this message, always include the lines below):

For a long time I’ve been searching information in Clickhouse documentation regarding a possible cause of this error, however it was in vain. The next idea that I had was checking the work of a local Kafka broker from the same machine. I installed Kafka client, connected Clickhouse, sent the data to topic and to Clickhouse and managed to read it easily, so Clickhouse consumer works with a local broker, meaning that it works in general.

Having spoken with all my acquaintances who are experts in the fields of infrastructure and Clickhouse, we weren’t able to identify the cause of the problem in stride. We’ve checked firewall, network settings,- everything was opened. It was also confirmed by the fact, that messages could be sent from a local machine to the topic of remote browser by the command bin/kafka-console-producer.sh and could be also read from there bin/kafka-console-consumer.sh.

Thereafter, I got the idea to appeal to the main guru and developer of Clickhouse – Alexey Milovidov. Alexey eagerly tried to reply to the questions arisen and proposed a number of hypothesis, that we checked (for instance, tracing of network connections, etc.), however, even after more low-level audit we didn’t manage to localize the problem. then, Alexey recommended to turn to Michail Philimonov from the company Altinity. Michail turned out to be an extremely responsive expert, and proposed one hypothesis after another in order to conduct testing (in parallel, providing tips on a better way of testing).

As a result of our joint efforts, we discovered that the problem arises at the library librdkafka, since the other package kafkacat, that uses the same library, falls off the connection to the broker with the very same problem (Local: timed out).

After examination of connection through bin/kafka-console-consumer.sh and connection parameters, Michail advised to add the following line into /etc/clickhouse-server/config.xml:

<kafka><security_protocol>ssl</security_protocol></kafka>

And, oh, what a miracle! Clickhouse connected to the cluster and pulled the required data from the broker.

I hope, this recipe and my experience will allow you to save time and powers in studying the similar problem :)

P.S. Actually Clickhouse has a very friendly community and telegram-chat where you can ask for advice and more likely to get help.

 No comments    694   4 mon   clickhouse   Data engineering   expert   troubleshooting   yandex

Looker Overview

Today we are going to talk about BI-platform Looker, on which I managed to work in 2019.

Here is the short content of the article for convenient and fast navigation:

  1. What is Looker?
  2. Which DBMS you can connect to via Looker and how?
  3. Building of Looker ML data model
  4. Explore Mode (data research on the model built)
  5. Building of reports and their saving in Look
  6. Examples of dashboards in Looker

What is Looker?

Creators of Looker position it as a software of business intelligence class and big data analytics platform, that helps to research, analyze and share business analytics in real time mode.
Looker — is a really convenient tool and one of a few BI products, that allows to work with pre-set data cubes in a real-time mode (actually, relational tables that are described in Look ML-model).
An engineer, working with Looker, needs to describe a data model on Look ML language (it’s something between CSS and SQL), publish this data model and then set reporting and dashboards.
Look ML itself is pretty simple, the nexus between the data objects are set by a data-engineer, which consequently allows to use the data without knowledge of SQL language (to be precise: Looker engine generates the code in SQL language itself on user’s behalf).

Just recently, in June 2019, Google announced acquisition of Looker platform for $2.6 billion.

Which DBMS you can connect to via Looker and how?

The selection of DBMS that Looker is working with is pretty wide. You can see the various connections on the screen shot below as of October, 2019:

Available DBMS for connection

You can easily set a connection to the database via web-interface:

Web-interface of connection to DBMS

With regard to connections to databases, I’d like to highlight the following two facts: first of all, unfortunately, Clickhouse support from Yandex is currently missing (as well as in the foreseeable future). Most likely, the support won’t appear, considering the fact that Looker was acquired by a competitor, Google.
updated: Actually, Looker supports Clickhouse from the December 2019
The second nuisance is that you can’t build one data model, that would apply to different DBMS. There is no inbuilt storage in Looker, that could combine the results of query (unlike the same Redash).
It means, that analytical architecture should be built within one DBMS (preferably with high action speed or on aggregated data).

Building of Looker ML data model

In order to build a report or a dashboard in Looker, you need to provisionally set a data model. Syntax of Look ML language is quite thoroughly described in the documentation. Personally, I can just add that model description doesn’t require long-time immersion for a specialist with SQL knowledge. Rather, one needs to rearrange the approach to data model preparation. Look ML language is very much alike CSS:

Console of Look ML model creation

In the data model the following is set up: links with tables, keys, granularity, information of some fields being facts, and other – measurements. For facts, the aggregation is written. Obviously, at model creation one can use various IF / CASE expressions.

Explore mode

Probably, it’s the main killer-feature of Looker, since it allows any business departments to get data without attraction of analysts / data engineers. And, guess that’s why use of accounts with Explore mode is billed separately.

In fact, Explore mode is an interface, that allows to use the set up Look ML data model, select the required metrics and measurements and build customized report / visualization.
For example, we want to understand how many actions of any kind were performed in Looker’s interface last week. In order to do it, using Explore mode, we select Date field and apply a filter to it: last week (in this sense, Looker is quite smart and it and it will be enough writing ‘Last week’ in the filter), thereafter we choose “Category” from the measurements, and Quantity as a metric. After pressing the button Run the ready report will be generated.

Building report in Looker

Then, using the data received in the table form, you can set up the visualization of any type.
For example, Pie chart:

Applying visualization to report

Building of reports and their saving in Look

Sometimes you can have a desire to save the set of data / visualization received in Explore and share it with colleagues, for this purpose Looker has a separate essense – Look. That is ready constructed report with selected filters / measurements / facts.

Example of the saved Look

Examples of dashboards in Looker

Systemizing the warehouse of Look created, oftentimes you want to receive a ready composition / overview of key metrics, that could be displayed on one list.
For these purposes dashboard creation fits perfectly. Dashboard is created either on the wing, or using previously created Look. One of the dashboard’s “tricks” is configuration of parameters, that are changed on all the dashboard and can be applied to all the Look at the same time.

Interesting features in one line

  • In Looker you can refer to other reports and, using such function, you can create a dynamic parameter, that is passed on by a link.
    For example, you’ve created a report with division of revenue by countries, and in this report you can refer to the dashboard on a separate country. Following the link, a user sees the dashboard on a specific country, that he clicked on.
  • On every Looker page there is a chat, where support service answers very promptly
  • Looker is not able to work with data merge on the level of various DBMS, however it can combine the data on the level of ready Look (in our case, this function works really weird).
  • Within the framework of work with various models, I have found out an extremely non-trivial use of SQL for calculation of unique values in a non-normalized data table, Looker calls it symmetric aggregates.
    SQL, indeed, looks very non-trivial:
SELECT 
 order_items.order_id AS "order_items.order_id",
 order_items.sale_price AS "order_items.sale_price",
 (COALESCE(CAST( ( SUM(DISTINCT (CAST(FLOOR(COALESCE(users.age ,0)
 *(1000000*1.0)) AS DECIMAL(38,0))) + 
 CAST(STRTOL(LEFT(MD5(CONVERT(VARCHAR,users.id )),15),16) AS DECIMAL(38,0))
 * 1.0e8 + CAST(STRTOL(RIGHT(MD5(CONVERT(VARCHAR,users.id )),15),16) AS DECIMAL(38,0)) ) 
 - SUM(DISTINCT CAST(STRTOL(LEFT(MD5(CONVERT(VARCHAR,users.id )),15),16) AS DECIMAL(38,0))
 * 1.0e8 + CAST(STRTOL(RIGHT(MD5(CONVERT(VARCHAR,users.id )),15),16) AS DECIMAL(38,0))) ) 
 AS DOUBLE PRECISION) 
 / CAST((1000000*1.0) AS DOUBLE PRECISION), 0) 
 / NULLIF(COUNT(DISTINCT CASE WHEN users.age IS NOT NULL THEN users.id 
 ELSE NULL END), 0)) AS "users.average_age"
FROM order_items AS order_items
LEFT JOIN users AS users ON order_items.user_id = users.id

GROUP BY 1,2
ORDER BY 3 DESC
LIMIT 500
  • At implementation of Looker to a purchase, JumpStart Kit is mandatory, which costs not less than $6k. Within this kit you receive support and consultation from Looker at tool implementation.
 No comments    206   4 mon   analysis   BI-tools   Data engineering   looker   sql
Earlier Ctrl + ↓