7 posts tagged

clickhouse

Clickhouse Training 101 by Altinity

Estimated read time – 5 min

Just recently I have completed a Clickhouse Training by Altinity (101 Series Training). For those who are just getting to know Clickhouse, Altinity offers free basic training: Data Warehouse Basics. I recommend starting with it if you are planning to dive into learning.

altinity-clickhouse-developer-300px.png
Certification by Altinity

I would like to share my experience on the training as well as the training notes.
The training costs $500 and lasts 4 days for 2 hours. It is carried out in the evenings Moscow time (starting from 19:00 GMT +3).

Session # 1.

The first day mostly revises everything covered in Data Warehouse Basics, but it has several new ideas on how to get useful information on queries from system tables.

For example, this query will show which commands are running and their status.

SELECT command, is_done
FROM system.mutations
WHERE table = 'ontime'

Besides, for me it was useful to learn about column compression with the use of codecs:

ALTER TABLE ontime
 MODIFY COLUMN TailNum LowCardinality(String) CODEC(ZSTD(1))

2021-08-0111.53.59.png

For those who are just starting with Clickhouse, the first day will be super useful as it will help in understanding table engines and syntax for their creation, partitions, inserting data (for example directly from S3).

INSERT INTO sdata
SELECT * FROM s3(
 'https://s3.us-east-1.amazonaws.com/d1-altinity/data/sdata*.csv.gz',
 'aws_access_key_id',
 'aws_secret_access_key',
 'Parquet',
 'DevId Int32, Type String, MDate Date, MDatetime
DateTime, Value Float64')

Session # 2.

I found the second day the most intense and useful as within this session Robert from Altinity talks about aggregate functions and materialized views ( detailed scheme for the creation of materialized views ) in Clickhouse in more detail.

2021-08-0218.11.45.png

It was super useful for me to learn about index types in Clickhouse.

2021-08-0218.20.35.png

Session # 3.

During the third day, colleagues share their knowledge on how to work with Kafka and JSON objects stored in the tables.
It was interesting to find out that working with arrays in Clickhouse is very similar to arrays in Python:

WITH [1, 2, 4] AS array
SELECT
 array[1] AS First,
 array[2] AS Second,
 array[3] AS Third,
 array[-1] AS Last,
 length(array) AS Length

When working with arrays, there is a great feature called ARRAY JOIN which “unrolls” arrays to rows.

2021-08-0219.14.28.png

Clickhouse allows you to efficiently interact with JSON objects stored in a table:

-- Get a JSON string value
SELECT JSONExtractString(row, 'request') AS request
FROM log_row LIMIT 3
-- Get a JSON numeric value
SELECT JSONExtractInt(row, 'status') AS status
FROM log_row LIMIT 3

This piece of code is an example of how to extract the elements of the JSON array “request” and “status” separately.

ALTER TABLE log_row
 ADD COLUMN
status Int16 DEFAULT
 JSONExtractInt(row, 'status')
ALTER TABLE log_row
UPDATE status = status WHERE 1 = 1

Session # 4.

The most difficult topic from my point of view was saved for the last day – building sharding and replication patterns and building queries on distributed Clickhouse servers.

2021-08-0219.43.22.png
2021-08-0219.47.52.png

Special respect to Altinity for an excellent collection of labs during the training.

Links:

 No comments    69   1 mon   clickhouse   sql

Collecting Social Media Data for Top ML, AI & Data Science related accounts on Instagram

Estimated read time – 9 min

Instagram is in the top 5 most visited websites, perhaps not for our industry. Nevertheless, we are going to test this hypothesis using Python and our data analytics skills. In this post, we will share how to collect social media data using the Instagram API.

Data collection method
The Instagram API won’t let us collect data about other platform users for no reason, but there is always a way. Try sending the following request:

https://instagram.com/leftjoin/?__a=1

The request returns a JSON object with detailed user information, for instance, we can easily get an account name, number of posts, followers, subscriptions, as well as the first ten user posts with likes count, comments and etc. The pyInstagram library allows sending such requests.

SQL schema
Data will be collected into thee Clickhouse tables: users, posts, comments. The users table will contain user data, such as user id, username, user’s first and last name, account description, number of followers, subscriptions, posts, comments, and likes, whether an account is verified or not, and so on.

CREATE TABLE instagram.users
(
    `added_at` DateTime,
    `user_id` UInt64,
    `user_name` String,
    `full_name` String,
    `base_url` String,
    `biography` String,
    `followers_count` UInt64,
    `follows_count` UInt64,
    `media_count` UInt64,
    `total_comments` UInt64,
    `total_likes` UInt64,
    `is_verified` UInt8,
    `country_block` UInt8,
    `profile_pic_url` Nullable(String),
    `profile_pic_url_hd` Nullable(String),
    `fb_page` Nullable(String)
)
ENGINE = ReplacingMergeTree
ORDER BY added_at

The posts table will be populated with the post owner name, post id, caption, comments coun, and so on. To check whether a post is an advertisement, Instagram carousel, or a video we can use these fields: is_ad, is_album and is_video.

CREATE TABLE instagram.posts
(
    `added_at` DateTime,
    `owner` String,
    `post_id` UInt64,
    `caption` Nullable(String),
    `code` String,
    `comments_count` UInt64,
    `comments_disabled` UInt8,
    `created_at` DateTime,
    `display_url` String,
    `is_ad` UInt8,
    `is_album` UInt8,
    `is_video` UInt8,
    `likes_count` UInt64,
    `location` Nullable(String),
    `recources` Array(String),
    `video_url` Nullable(String)
)
ENGINE = ReplacingMergeTree
ORDER BY added_at

In the comments table, we store each comment separately with the comment owner and text.

CREATE TABLE instagram.comments
(
    `added_at` DateTime,
    `comment_id` UInt64,
    `post_id` UInt64,
    `comment_owner` String,
    `comment_text` String
)
ENGINE = ReplacingMergeTree
ORDER BY added_at

Writing the script
Import the following classes from the library: Account, Media, WebAgent and Comment.

from instagram import Account, Media, WebAgent, Comment
from datetime import datetime
from clickhouse_driver import Client
import requests
import pandas as pd

Next, create an instance of the WebAgent class required for some library methods and data updating. To collect any meaningful information we need to have at least account names. Since we don’t have them yet, send the following request to search for porifles by the  keywords specified in queries_list. The search results will be composed of Instagram pages that match any keyword in the list.

agent = WebAgent()
queries_list = ['machine learning', 'data science', 'data analytics', 'analytics', 'business intelligence',
                'data engineering', 'computer science', 'big data', 'artificial intelligence',
                'deep learning', 'data scientist','machine learning engineer', 'data engineer']
client = Client(host='12.34.56.789', user='default', password='', port='9000', database='instagram')
url = 'https://www.instagram.com/web/search/topsearch/?context=user&count=0'

Let’s iterate the keywords collecting all matching accounts. Then remove duplicates from the obtained list by converting it to set and back.

response_list = []
for query in queries_list:
    response = requests.get(url, params={
        'query': query
    }).json()
    response_list.extend(response['users'])
instagram_pages_list = []
for item in response_list:
    instagram_pages_list.append(item['user']['username'])
instagram_pages_list = list(set(instagram_pages_list))

Now we need to loop through the list of pages and request detailed information about an account if it’s not in the table yet. Create an instance of the Account class and pass username as a parameter.
Then update the account information using the agent.update()
method. We will collect only the first 100 posts to keep it moving. Next, create a list named media_list to store received post ids after calling the agent.get_media() method.


Collecting user media data

all_posts_list = []
username_count = 0
for username in instagram_pages_list:
    if client.execute(f"SELECT count(1) FROM users WHERE user_name='{username}'")[0][0] == 0:
        print('username:', username_count, '/', len(instagram_pages_list))
        username_count += 1
        account_total_likes = 0
        account_total_comments = 0
        try:
            account = Account(username)
        except Exception as E:
            print(E)
            continue
        try:
            agent.update(account)
        except Exception as E:
            print(E)
            continue
        if account.media_count < 100:
            post_count = account.media_count
        else:
            post_count = 100
        print(account, post_count)
        media_list, _ = agent.get_media(account, count=post_count, delay=1)
        count = 0

Because we need to count the total number of likes and comments before adding a new user to our database, we’ll start with them first. Almost all required fields belong to the Media class:


Collecting user posts

for media_code in media_list:
            if client.execute(f"SELECT count(1) FROM posts WHERE code='{media_code}'")[0][0] == 0:
                print('posts:', count, '/', len(media_list))
                count += 1

                post_insert_list = []
                post = Media(media_code)
                agent.update(post)
                post_insert_list.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
                post_insert_list.append(str(post.owner))
                post_insert_list.append(post.id)
                if post.caption is not None:
                    post_insert_list.append(post.caption.replace("'","").replace('"', ''))
                else:
                    post_insert_list.append("")
                post_insert_list.append(post.code)
                post_insert_list.append(post.comments_count)
                post_insert_list.append(int(post.comments_disabled))
                post_insert_list.append(datetime.fromtimestamp(post.date).strftime('%Y-%m-%d %H:%M:%S'))
                post_insert_list.append(post.display_url)
                try:
                    post_insert_list.append(int(post.is_ad))
                except TypeError:
                    post_insert_list.append('cast(Null as Nullable(UInt8))')
                post_insert_list.append(int(post.is_album))
                post_insert_list.append(int(post.is_video))
                post_insert_list.append(post.likes_count)
                if post.location is not None:
                    post_insert_list.append(post.location)
                else:
                    post_insert_list.append('')
                post_insert_list.append(post.resources)
                if post.video_url is not None:
                    post_insert_list.append(post.video_url)
                else:
                    post_insert_list.append('')
                account_total_likes += post.likes_count
                account_total_comments += post.comments_count
                try:
                    client.execute(f'''
                        INSERT INTO posts VALUES {tuple(post_insert_list)}
                    ''')
                except Exception as E:
                    print('posts:')
                    print(E)
                    print(post_insert_list)

Store comments in the variable with the same name after calling the get_comments() method:


Collecting post comments

comments = agent.get_comments(media=post)
                for comment_id in comments[0]:
                    comment_insert_list = []
                    comment = Comment(comment_id)
                    comment_insert_list.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
                    comment_insert_list.append(comment.id)
                    comment_insert_list.append(post.id)
                    comment_insert_list.append(str(comment.owner))
                    comment_insert_list.append(comment.text.replace("'","").replace('"', ''))
                    try:
                        client.execute(f'''
                            INSERT INTO comments VALUES {tuple(comment_insert_list)}
                        ''')
                    except Exception as E:
                        print('comments:')
                        print(E)
                        print(comment_insert_list)

And now, when we have obtained user posts and comments new information can be added to the table.


Collecting user data

user_insert_list = []
        user_insert_list.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
        user_insert_list.append(account.id)
        user_insert_list.append(account.username)
        user_insert_list.append(account.full_name)
        user_insert_list.append(account.base_url)
        user_insert_list.append(account.biography)
        user_insert_list.append(account.followers_count)
        user_insert_list.append(account.follows_count)
        user_insert_list.append(account.media_count)
        user_insert_list.append(account_total_comments)
        user_insert_list.append(account_total_likes)
        user_insert_list.append(int(account.is_verified))
        user_insert_list.append(int(account.country_block))
        user_insert_list.append(account.profile_pic_url)
        user_insert_list.append(account.profile_pic_url_hd)
        if account.fb_page is not None:
            user_insert_list.append(account.fb_page)
        else:
            user_insert_list.append('')
        try:
            client.execute(f'''
                INSERT INTO users VALUES {tuple(user_insert_list)}
            ''')
        except Exception as E:
            print('users:')
            print(E)
            print(user_insert_list)

Conclusion
To sum up, we have collected data of 500 users, with nearly 20K posts and 40K comments. As the database will be updated, we can write a simple query to get the top 10 ML, AI & Data Science related most followed accounts for today.

SELECT *
FROM users
ORDER BY followers_count DESC
LIMIT 10

And as a bonus, here is a list of the most interesting Instagram accounts on this topic:

  1. @ai_machine_learning
  2. @neuralnine
  3. @datascienceinfo
  4. @compscistuff
  5. @computersciencelife
  6. @welcome.ai
  7. @papa_programmer
  8. @data_science_learn
  9. @neuralnet.ai
  10. @techno_thinkers

View the code on GitHub

 No comments    244   11 mon   Analytics engineering   clickhouse   data analytics   instagram   python

Handling website buttons in Selenium

Estimated read time – 8 min

In our previous article, Parsing the data of site’s catalogue, using Beautiful Soup and Selenium we have addressed the problem of working with dynamic pages, but sometimes this method doesn’t work, as with “Show more” buttons. Today we will show how you can imitate button click with Selenium to load a whole page, collect beer IDs, ratings, and send the data to Clickhouse.

Webpage structure

Let’s take a random brewery that has 105 check-ins, or customer feedbacks. One page with check-ins displays up to 25 records and looks like this:

If we try to scroll down to the bottom, we will encounter the same button that prevents us from getting all 105 records at once:

First off, to address this task, let’s find out the button class and just click it until it works. Since Selenium launches the browser and the next “Show more” button may not be loaded in time, that’s why we set 2-second intervals between the clicks. As soon as the page is loaded we will take its content and parse the relevant data.
Let’s view the source code and  find the button, it’s assigned to the more_checkins class.

The button has style attributes, such as display. When the button is displayed this attribute takes the block value. But when we scroll the page to the buttom and there is nothing left to display, the attribute takes the none value and we can stop clicking.

Writing our code

Let’s import the necessary libraries

import time
from selenium import webdriver
from bs4 import BeautifulSoup as bs
import re
from datetime import datetime
from clickhouse_driver import Client

Chromedriver is used to run Selenium tests on Chrome and can be downloaded from the official website

Connect to the database and create cookies:

client = Client(host='ec1-23-456-789-10.us-east-2.compute.amazonaws.com', user='', password='', port='9000', database='')
count = 0
cookies = {
    'domain':'untappd.com',
    'expiry':1594072726,
    'httpOnly':True,
    'name':'untappd_user_v3_e',
    'path':'/',
    'secure':False,
    'value':'your_value'
}

You can find out more about working with cookies in Selenium from Parsing the data of site’s catalogue, using Beautiful Soup and Selenium. We will need the untappd_user_v3_e parameter.

As we are going to work with pages that have more than hundreds of thousands of records, it’s pretty heavy and our instance may be overloaded. To prevent this, we will shut down unnecessary parts and then enable authentication cookie:

options = webdriver.ChromeOptions()
prefs = {'profile.default_content_setting_values': {'images': 2, 
                            'plugins': 2, 'fullscreen': 2}}
options.add_experimental_option('prefs', prefs)
options.add_argument("start-maximized")
options.add_argument("disable-infobars")
options.add_argument("--disable-extensions")
driver = webdriver.Chrome(options=options)
driver.get('https://untappd.com/TooSunnyBrewery')
driver.add_cookie(cookies)

We will need a function that would take a link, open it in the browser, load a whole page and return a soup object to be parsed. Get the  display attribute, assign it to the more_checkins: variable and click the button until the attribute is none. Let’s set 2-second intervals between the clicks, to wait for the page to load. As soon as we received the page, converth it into a soup object using the bs4 library.

def get_html_page(url):
    driver.get(url)
    driver.maximize_window()
    more_checkins = driver.execute_script("var more_checkins=document.getElementsByClassName('more_checkins_logged')[0].style.display;return more_checkins;")
    print(more_checkins)
    while more_checkins != "none":
        driver.execute_script("document.getElementsByClassName('more_checkins_logged')[0].click()")
        time.sleep(2)
        more_checkins = driver.execute_script("var more_checkins=document.getElementsByClassName('more_checkins_logged')[0].style.display;return more_checkins;")
        print(more_checkins)
    source_data = driver.page_source
    soup = bs(source_data, 'lxml')
    return soup

Write the following function that will take a page url, pass it in the get_html_page and receive a soup object to parse. The function returns zipped lists with beer IDs and ratings.

See how you can use Beautiful Soup to retrieve data from a website catalogue

def parse_html_page(url):
    soup = get_html_page(url)
    brewery_id = soup.find_all('a', {'class':'label',
                                     'href':re.compile('https://untappd.com/brewery/*')})[0]['href'][28:]
    items = soup.find_all('div', {'class':'item',
                                  'id':re.compile('checkin_*')})
    checkin_rating_list = []
    beer_id_list = []
    count = 0
    print('Filling the lists')
    for checkin in items:
        print(count, '/', len(items))
        try:
            checkin_rating_list.append(float(checkin.find('div', {'class':'caps'})['data-rating']))
        except Exception:
            checkin_rating_list.append('cast(Null as Nullable(Float32))')
        try:
            beer_id_list.append(int(checkin.find('a', {'class':'label'})['href'][-7:]))
        except Exception:
            beer_id_list.append('cast(Null as Nullable(UInt64))')
        count += 1 
    return zip(checkin_rating_list, beer_id_list)

Finally, write a function call for the breweries. We’ve covered how to receive a list of Russian brewery IDs in this article: Example of using dictionaries in Clickhouse with Untappd.
Let’s fetch it from the Clickhouse table.

brewery_list = client.execute('SELECT brewery_id FROM brewery_info')

If we print out the brewery_list, we will find out that the data is stored in a list of tuples.

Let’s make it a bit prettier with the lambda expression:

flatten = lambda lst: [item for sublist in lst for item in sublist]
brewery_list = flatten(brewery_list)

That’s much better:

Create a url for each brewery in the list, it includes a standard link and a brewery ID in the end. Pass it to the parse_html_page function that fetches the get_html_page and return lists with beer_id and rating_score. Since the lists are zipped, we can iterate throught them, create a tuple and send it to Clickhouse.

for brewery_id in brewery_list:
    print('Fetching the brewery with id', brewery_id, count, '/', len(brewery_list))
    url = 'https://untappd.com/brewery/' + str(brewery_id)
    returned_checkins = parse_html_page(url)
    for rating, beer_id in returned_checkins:
        tuple_to_insert = (rating, beer_id)
        try:
            client.execute(f'INSERT INTO beer_reviews VALUES {tuple_to_insert}')
        except errors.ServerException as E:
            print(E)
    count += 1

That’s it about the way we can handle “Show more” buttons. Over time we will form a large dataset for further analysis, to work with in our next series.

 No comments    192   2020   Amazon Web Services   Analytics engineering   AWS   clickhouse   python

Example of using dictionaries in Clickhouse with Untappd

Estimated read time – 12 min

In Clickhouse we can use internal dictionaries as well as external dictionaries, they can be an alternative to JSON that doesn’t always work fine. DIctionaries store information in memory and can be invoked with the dictGet method. Let’s review how we can create one in Clickhouse and use it for our queries.

We will illustrate an example of data using the Untappd API. Untappd is a social network for everyone who loves craft beer. We are going to use сheck-ins of Russian-based craft breweries and start collecting information about them to analyze this data later on and to draw some conclusions. in today’s article, we will analyze how to receive metadata on Russian breweries with Untappd and store it in a Clickhouse dictionary.

Collecting data with Untappd

First off, we need to create a new app to receive client_id and  client_secret_key to make API calls. Follow  this link and fill in the fields:

Usually, it takes about 1 to 3 weeks to wait for approval.

import requests
import pandas as pd
import time

We’ll be using the requests library to make API calls, view results in a Pandas DataFrame, and save them in a CSV file before sending it to a Clickhouse dictionary. Untappd has strict limits on the number of requests, prohibiting us to make more than 100 calls per hour. Therefore, we need to make our script wait for 38 seconds using the Python time module.

client_id = 'your_client_id'
client_secret = 'your_client_secret'
all_brewery_of_russia = []

We want to get data for one thousand Russian breweries. One request to the Brewery Search method enables us to view up to 50 breweries. The website gave us 3369 breweries when searching the word “Russia” manually.

Let’s check this, scroll down to the bottom, and open the page code.

Each brewery received is stored in the beer-item class. This means we can the number of references to beer-item:

And as it turned out, we have exactly 1000 breweries, not 3369. When searching the word “Russia” manually, the results also contain some American breweries. So, we need to make 20 calls, getting 50 breweries at a time:

for offset in range(0, 1000, 50):
    try:
        print('offset = ', offset)
        print('remained:', 1000 - offset, '\n')
        response = requests.get(f'https://api.untappd.com/v4/search/brewery?client_id={client_id}&client_secret={client_secret}',
                               params={
                                   'q':'Russia',
                                   'offset':offset,
                                   'limit':50
                               })
        item = response.json()
        print(item, '\n')
        all_brewery_of_russia.append(item)
        time.sleep(37)
    except Exception:
        print(Exception)
        continue

The Brewery Search method includes several parameters, q – a string with a country name (specify specify “Russia” to get all the breweries based in Russia), offset – allows us to shift by 50 lines in the search to get the next list of breweries, limit – restricts the number of breweries received and can not be more than 50. Convert the answer to JSON and append data sotred in the item object to the  all_brewery_of_russia list.

Our data may also include breweries from other countries. That’s why we need to filter the data. Iterate through the all_brewery_of_russia list and keep only those breweires, which country_name is Russia.

brew_list = []
for element in all_brewery_of_russia:
    brew = element['response']['brewery']
    for i in range(brew['count']):
        if brew['items'][i]['brewery']['country_name'] == 'Russia':
            brew_list.append(brew['items'][i])

Print out the first element in our brew_list:

print(brew_list[0])

Create a DataFrame with the following columns: brewery_id, beer_count, brewery_name, brewery_slug, brewery_page_url, brewery_city, lat и  lng. And several lists to sort out the data stored in the brewery_list:

df = pd.DataFrame()
brewery_id_list = []
beer_count_list = []
brewery_name_list = []
brewery_slug_list = []
brewery_page_url_list = []
brewery_location_city = []
brewery_location_lat = []
brewery_location_lng = []
for brewery in brew_list:
    brewery_id_list.append(brewery['brewery']['brewery_id'])
    beer_count_list.append(brewery['brewery']['beer_count'])
    brewery_name_list.append(brewery['brewery']['brewery_name'])
    brewery_slug_list.append(brewery['brewery']['brewery_slug'])
    brewery_page_url_list.append(brewery['brewery']['brewery_page_url'])
 brewery_location_city.append(brewery['brewery']['location']['brewery_city'])
    brewery_location_lat.append(brewery['brewery']['location']['lat'])
    brewery_location_lng.append(brewery['brewery']['location']['lng'])

Assign them as column values:

df['brewery_id'] = brewery_id_list
df['beer_count'] = beer_count_list
df['brewery_name'] = brewery_name_list
df['brewery_slug'] = brewery_slug_list
df['brewery_page_url'] = brewery_page_url_list
df['brewery_city'] = brewery_location_city
df['brewery_lat'] = brewery_location_lat
df['brewery_lng'] = brewery_location_lng

And view our DataFrame:

df.head()

Let’s sort the values by brewery_id and store our DataFrame as a CSV file without index column and headings:

df = df.sort_values(by='brewery_id')
df.to_csv('brewery_data.csv', index=False, header=False)

Creating a Clickhouse dictionary

You can create Clickouse dictionaries in many different ways. We will try to structure it in an XML file, configure the server files, and access it through our client. The XML file structure will be the following:

Learn more about other ways you can create Clickhouse dictionaries in the documentation

<yandex>
<dictionary>
        <name>breweries</name>
        <source>
                <file>
                        <path>/home/ubuntu/brewery_data.csv</path>
                        <format>CSV</format>
                </file>
        </source>
        <layout>
                <flat />
        </layout>
        <structure>
                <id>
                        <name>brewery_id</name>
                </id>
                <attribute>
                        <name>beer_count</name>
                        <type>UInt64</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>brewery_name</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>brewery_slug</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>brewery_page_url</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>brewery_city</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>lat</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>lng</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
        </structure>
        <lifetime>300</lifetime>
</dictionary>
</yandex>

name is a dictionary name, attribute holds the properties of the columns, id is a key field, file stores file path and format. We are going to store our file in this directory: /home/ubuntu.

Let’s upload our CSV and XML files to the server, it can be done using an FTP like FileZilla. We explained how to deploy Clickhouse on an Amazon instance in our previous article, this time need to do the same. Open your FileZilla client and go to SFTP settings to add a private key:

Connect to your server address, it can be found in the EC2 management console. Specify SFTP as a protocol, your Host, and Ubuntu as a username.

Your Public DNS may change in case of overload

After connecting we will wind up in this location /home/ubuntu. Let’s put the files in that folder and connect via SSH using Termius. Then we need to move the files to /etc/clickhouse-server to view them in Clickhouse:

Learn how you can connect to an AWS server using SSH client from our previous material Installing Clickhouse on AWS

sudo mv breweries_dictionary.xml /etc/clickhouse server/

Go to the config file:

cd /etc/clickhouse-server
sudo nano config.xml

We need the  tag, it’s the path to a file that describes the dictionaries structure. Specify the path to our XML file:

<dictionaries_config>/etc/clickhouse-server/breweries_dictionary.xml</dictionaries_config>

Save our file and run the Clickhouse client:

clickhouse client

Let’s check that the dictionary really loaded:

SELECT * FROM system.dictionaries\G

In case of success you will get the following:

Now, let’s write a query with the  dictGet function to get the name of the brewery with ID 999. Pass in the dictionary name, as the first argument, then the filed name and ID.

SELECT dictGet('breweries', 'brewery_name', toUInt64(999))

And our query returns this:

Similarly, we could use this function to get a beer name, when the table contains only IDs.

 No comments    712   2020   Amazon Web Services   clickhouse   data analytics   Data engineering   python

Working with Materialized Views in Clickhouse

Estimated read time – 7 min

This time we’ll illustrate how you can pass data on Facebook ad campaigns to Clickhouse tables with Python and implement Materialized Views. What is materialized views, you may ask. Oftentimes Clickhouse is used to handle large amounts of data and the time spent waiting for a response from a table with raw data is constantly increasing. Usually, we would use ETL-process to address this task efficiently or create aggregate tables, which are not that useful because we have to regularly update them. Clickhouse system offers a new way to meet the challenge using materialized views.
Materialized Views allow us to store and update data on a hard drive in line with the SELECT query that was used to get a view. When we need to insert data into a table, the SELECT method transforms our data and populates a materialized view.

Setting Up Amazon EC2 instance
We need to connect our Python script that we created in this article to Cickhouse. The script will make queries, so let’s open several ports. In your AWS Dashboard go to Network & Security — Security Groups. Our instance belongs to the launch-wizard-1 group. Сlick it and pay attention to the Inbound rules, you need to set them as shown in this screenshot:

Setting up Clickhouse
It’s time to set up Clickhouse. Let’s edit the config.xml file using nano text editor:

cd /etc/clickhouse-server
sudo nano config.xml

Learn more about the shortcuts here if you didn’t get how to exit nano too :)

Uncomment this line:

<listen_host>0.0.0.0</listen_host>

to access your database from any IP-address:

Create a table and its materialized view
Open a terminal window to create our database with tables:

CREATE DATABASE db1
USE db1

We’ll refer to the same example of data collection from Facebook. The data on Ad Campaigns may often change and be updated, with this in mind we want to create a materialized view that would automatically update aggregate tables containing the costs data. Our Clickhouse table will look almost the same as the DataFrame used in the previous post. We picked ReplacingMergeTree as an engine for our table, it will remove duplicates by sorting key:

CREATE TABLE facebook_insights(
	campaign_id UInt64,
	clicks UInt32,
	spend Float32,
	impressions UInt32,
	date_start Date,
	date_stop	 Date,
	sign Int8
) ENGINE = ReplacingMergeTree
ORDER BY (date_start, date_stop)

And then, create a materialized view:

CREATE MATERIALIZED VIEW fb_aggregated
ENGINE = SummingMergeTree()
ORDER BY date_start
	AS
	SELECT campaign_id,
		      date_start,
		      sum(spend * sign) as spent,
		      sum(impressions * sign) as impressions,
		      sum(clicks * sign) as clicks
	FROM facebook_insights
	GROUP BY date_start, campaign_id

More details are available in the Clickhouse blog.

Unfortunately for us, Clikhouse system doesn’t include a familiar UPDATE method. So we need to find a workaround. Thanks to the Yandex team, these guys offered to insert rows with a negative sign first, and then use sign for reversing. According to this principle, the old data will be ignored when summing.

Script
Let’s start writing the script and import a new library, which is called clickhouse_driver. It allows to make queries to Clickhouse in Python:

We are using the updated version of the script from “Collecting Data on Facebook Ad Campaigns”. But it will work fine if you just combine this code with the previous one.

from datetime import datetime, timedelta
from clickhouse_driver import Client
from clickhouse_driver import errors

An object of the Clientclass enables us to make queries with the execute() method. Type in your public DNS in the host field, port – 9000, specify default as a user, and a databasefor the connection.

client = Client(host='ec1-2-34-56-78.us-east-2.compute.amazonaws.com', user='default', password=' ', port='9000', database='db1')

To ensure that everything works as expected, we need to write the following query that will print out names of all databases stored on the server:

client.execute('SHOW DATABASES')

In case of success the query will return this list:

[('_temporary_and_external_tables',), ('db1',), ('default',), ('system',)]

For example, we want to get data for the past three days. Create several datetime objects with the datetime library and convert them to strings using thestrftime()
method:

date_start = datetime.now() - timedelta(days=3)
date_end = datetime.now() - timedelta(days=1)
date_start_str = date_start.strftime("%Y-%m-%d")
date_end_str = date_end.strftime("%Y-%m-%d")

This query returns all table columns for a certain period:

SQL_select = f"select campaign_id, clicks, spend, impressions, date_start, date_stop, sign from facebook_insights where date_start > '{date_start_str}' AND date_start < '{date_end_str}'"

Make a query and pass the data to the old_data_list. And then, replace their signfor -1 and append elements to the new_data_list:

new_data_list = []
old_data_list = []
old_data_list = client.execute(SQL_select)

for elem in old_data_list:
    elem = list(elem)
    elem[len(elem) - 1] = -1
    new_data_list.append(elem)

Finally, write our algorithm: insert the data with the sign =-1, optimize it with ReplacingMergeTree, remove duplicates, and INSERT new data with the sign = 1.

SQL_query = 'INSERT INTO facebook_insights VALUES'
client.execute(SQL_query, new_data_list)
SQL_optimize = "OPTIMIZE TABLE facebook_insights"
client.execute(SQL_optimize)
for i in range(len(insight_campaign_id_list)):
    client.execute(SQL_query, [[insight_campaign_id_list[i],
                                insight_clicks_list[i],
                                insight_spend_list[i],
                                insight_impressions_list[i],
                                datetime.strptime(insight_date_start_list[i], '%Y-%m-%d').date(),
                                datetime.strptime(insight_date_start_list[i], '%Y-%m-%d').date(),
                                1]])
    client.execute(SQL_optimize)

Get back to Clickhouse and make the next query to view the first 20 rows:
SELECT * FROM facebook_insights LIMIT 20

And  SELECT * FROM fb_aggregated LIMIT 20 to compare our materialized view:

Nice work! Now we have a materialized view that will be updated each time when the data in the facebook_insights table changes. The trick with the sign operator allows to differ already processed data and prevent its summation, while ReplacingMergeTree engine helps us to remove duplicates.

 No comments    660   2020   Amazon Web Services   Analytics engineering   AWS   clickhouse   data analytics   python
Earlier Ctrl + ↓