Ingest GitHub Events in Real-Time into Lakehouse Tables Using Python Tasks

https://api.github.com/events provides an API service to fetch event data from 5 minutes ago in real time. By combining near-real-time data from the API with offline data synced from the gharchive website via files, you can improve data freshness from hourly to minute-level granularity, better serving applications that require higher data freshness. This example also demonstrates how to write data to tables using the Python connect bulkload method of Singdata Lakehouse.

Create the Target Table

Execute the following table creation statement in a SQL script task node.

CREATE TABLE IF NOT EXISTS `github_timeline_realtime_events` (

`id` STRING,

`type` STRING,

`repo_id` STRING,

`repo_name` STRING,

`minute` STRING,

`second` STRING,

`created_at` TIMESTAMP,

`data` STRING COMMENT 'record data, json format',

`__last_modified__` STRING COMMENT '__last_modified__'

) PARTITIONED BY (DATE STRING,HOUR STRING) COMMENT 'public GitHub timeline record,real time events, ingested from https://api.github.com/events';

Writing Python Code

import requests,json

import time

import pytz

import pandas as pd

from requests.exceptions import RequestException

from datetime import datetime, timedelta

from clickzetta import connect

from clickzetta.bulkload.bulkload_enums import BulkLoadOptions,BulkLoadOperation,BulkLoadCommitOptions

def get_lakehouse_connect():

    conn = connect(

    username='',

    password='',

    service='<region_id>.api.singdata.com',

    instance='',

    workspace='gharchive',

    schema='public',

    vcluster='default')

    return conn

def get_lakehouse_queries_data_hints(sql_query,query_tag):

    conn = get_lakehouse_connect()

    # Execute SQL

    cursor = conn.cursor()

    my_param = dict()

    my_param["hints"] = dict()

    my_param["hints"]["query_tag"] =query_tag

    cursor.execute(sql_query, parameters=my_param)

    df = pd.DataFrame(cursor.fetchall(), columns=[i[0] for i in cursor.description])

    return df

def get_data(url, headers, params):

    retry_times = 5

    intervals = [10,30,60, 300, 600]

    

    for i in range(retry_times):

        try:

            response = requests.get(url, headers=headers, params=params)

            response.raise_for_status()

            return response

        except RequestException as e:

            print(f"Github API request {url} failed, attempt {i+1}: {e}")

            if response is None or response.status_code != 200:

                return None

            if i < retry_times - 1:

                time.sleep(intervals[i])

            else:

                return None

            

def bulk_load_data(real_time_events):

    try:

        bulkload_stream_conn = get_lakehouse_connect()

        bulkload_stream = bulkload_stream_conn.create_bulkload_stream(schema='public', table='github_timeline_realtime_events',record_keys=['id'],operation=BulkLoadOperation.UPSERT)

        if bulkload_stream:

            writer = bulkload_stream.open_writer(0)

            print("Successfully connected to the Singdata Lakehouse")

        for event in real_time_events:

            event_json = json.dumps(event)

            event_dict = json.loads(event_json)

            # print(f"event_dict is:\n\n{event_dict}")

            row = writer.create_row()

            row.set_value('id', event_dict['id'])

            row.set_value('type', event_dict['type'])

            row.set_value('repo_id', event_dict['repo']['id'])

            row.set_value('repo_name', event_dict['repo']['name'])

            created_at_utc = datetime.strptime(event_dict['created_at'], '%Y-%m-%dT%H:%M:%SZ')

            created_at_e8 = created_at_utc + timedelta(hours=8)

            # Extract date (as string)

            date_e8 = created_at_e8.strftime('%Y-%m-%d')

            # Extract hour (as string)

            hour_e8 = created_at_e8.strftime('%H')

            # Extract minute (as string)

            minute_e8 = created_at_e8.strftime('%M')

            # Extract second (as string)

            second_e8 = created_at_e8.strftime('%S')

            row.set_value('date', date_e8)

            row.set_value('hour', hour_e8)

            row.set_value('minute', minute_e8)

            row.set_value('second', second_e8)

            row.set_value('created_at', created_at_e8)

            

            row.set_value('data', event_json)

            

            row.set_value('__last_modified__', created_at_e8)

            writer.write(row)

        writer.close()

        bulkload_stream.commit()

        print(f"{len(events)} events have been written to Singdata Lakehouse.")

        

    except Exception  as e:

        print("Error while connecting to Singdata Lakehouse,Exception is ", e)

    finally:

        print("finally Singdata Lakehouse connection is closed")

# tz = pytz.timezone('Asia/Shanghai')

# Initialize ETag and event list

etag = None

events = []

# Create a dictionary to store unique event IDs

seen_event_ids = {}

response = None

headers = {

    'Authorization': f'please replace your github token'

}

params = {'per_page': 100}

url = 'https://api.github.com/events'

while True:

    if etag:

        headers['If-None-Match'] = etag

    

    response = get_data(url, headers=headers, params = params)

    if response is not None:

        remaining = response.headers.get('X-RateLimit-Remaining')

        print(f"X-RateLimit-Remaining: {remaining}")

        ETag = response.headers.get('ETag')

        print(f"ETag: {ETag}")

        # Check response status

        if response.status_code == 200:

            # Update ETag

            etag = response.headers.get('ETag')

            

            # Get events and add to the list

            new_events = response.json()

            

            for event in new_events:

                event_id = event.get('id')

                if event_id not in seen_event_ids:

                    events.append(event)

                    seen_event_ids[event_id] = True

            

            # Print event count

            print(f'Event count: {len(events)}')

            

        elif response.status_code == 304:

            print('response.status_code == 304, no new events.')

        else:

            print(f'response.status_code is {response.status_code}')

        # Sleep based on GitHub's X-Poll-Interval header

        # sleep_time = int(response.headers.get('X-Poll-Interval', 0))

        sleep_time = 0

        # Check if the 'Link' header contains the 'next' keyword. If not, it means the last page has been reached.

        if 'next' not in response.links:

            url = 'https://api.github.com/events'

            # time.sleep(sleep_time)

            if len(events)>=700:

                bulk_load_data(events)

                events.clear()

                seen_event_ids = {}

        else:

            # Update URL to the next page link

            url = response.links['next']['url']

Run the Task

Since this task runs in a loop, after clicking Run, the task will run continuously in the Python node, constantly fetching the latest GitHub events and writing them to the Singdata Lakehouse table until manually canceled.