Use Python Tasks to Fetch GitHub Events in Real-Time and Import into Lakehouse Table
https://api.github.com/events provides an API service that allows real-time access to event data from 5 minutes ago. By obtaining real-time data through the API and synchronizing offline data from the gharchive website via file, the data timeliness is improved from hourly to minute-level, further enhancing data freshness to better serve applications with higher requirements for data freshness. This example also demonstrates how to use Singdata Lakehouse's Python connect to bulkload data into a table.
Create Target Table
Execute the following table creation statement in the SQL script task node.
CREATE TABLE IF NOT EXISTS `github_timeline_realtime_events` ( `id` STRING, `type` STRING, `repo_id` STRING, `repo_name` STRING, `minute` STRING, `second` STRING, `created_at` TIMESTAMP, `data` STRING COMMENT 'record data, json format', `__last_modified__` STRING COMMENT '__last_modified__' ) PARTITIONED BY (DATE STRING,HOUR STRING) COMMENT 'public GitHub timeline record,real time events, ingested from https://api.github.com/events';
Writing Python Code
import requests,json import time import pytz import pandas as pd from requests.exceptions import RequestException from datetime import datetime, timedelta from clickzetta import connect from clickzetta.bulkload.bulkload_enums import BulkLoadOptions,BulkLoadOperation,BulkLoadCommitOptions def get_lakehouse_connect(): conn = connect( username='', password='', service='api.clickzetta.com', instance='', workspace='gharchive', schema='public', vcluster='default') return conn def get_lakehouse_queries_data_hints(sql_query,query_tag): conn = get_lakehouse_connect() # Execute SQL cursor = conn.cursor() my_param = dict() my_param["hints"] = dict() my_param["hints"]["query_tag"] =query_tag cursor.execute(sql_query, parameters=my_param) df = pd.DataFrame(cursor.fetchall(), columns=[i[0] for i in cursor.description]) return df def get_data(url, headers, params): retry_times = 5 intervals = [10,30,60, 300, 600] for i in range(retry_times): try: response = requests.get(url, headers=headers, params=params) response.raise_for_status() return response except RequestException as e: print(f"Github API request {url} failed, attempt {i+1}: {e}") if response is None or response.status_code != 200: return None if i < retry_times - 1: time.sleep(intervals[i]) else: return None def bulk_load_data(real_time_events): try: bulkload_stream_conn = get_lakehouse_connect() bulkload_stream = bulkload_stream_conn.create_bulkload_stream(schema='public', table='github_timeline_realtime_events',record_keys=['id'],operation=BulkLoadOperation.UPSERT) if bulkload_stream: writer = bulkload_stream.open_writer(0) print("Successfully connected to the ClickZetta Lakehouse") for event in real_time_events: event_json = json.dumps(event) event_dict = json.loads(event_json) # print(f"event_dict is:\n\n{event_dict}") row = writer.create_row() row.set_value('id', event_dict['id']) row.set_value('type', event_dict['type']) row.set_value('repo_id', event_dict['repo']['id']) row.set\_value('repo_name', event_dict['repo']['name']) created_at_utc = datetime.strptime(event_dict['created_at'], '%Y-%m-%dT%H:%M:%SZ') created_at_e8 = created_at_utc + timedelta(hours=8) # Extract date (string) date_e8 = created_at_e8.strftime('%Y-%m-%d') # Extract hour (string) hour_e8 = created_at_e8.strftime('%H') # Extract minute (string) minute_e8 = created_at_e8.strftime('%M') # Extract second (string) second_e8 = created_at_e8.strftime('%S') row.set\_value('date', date_e8) row.set_value('hour', hour_e8) row.set_value('minute', minute_e8) row.set_value('second', second_e8) row.set_value('created_at', created_at_e8) row.set_value('data', event_json) row.set_value('__last\_modified__', created_at_e8) writer.write(row) writer.close() bulkload_stream.commit() print(f"{len(events)} events have been written to ClickZetta Lakehouse.") except Exception as e: print("Error while connecting to ClickZetta Lakehouse,Exception is ", e) finally: print("finally ClickZetta Lakehouse connection is closed") # tz = pytz.timezone('Asia/Shanghai') # Initialize ETag and event list etag = None events = [] # Create a dictionary to store unique event IDs seen_event_ids = {} response = None headers = { 'Authorization': f'please replace your github token' } params = {'per_page': 100} url = 'https://api.github.com/events' while True: if etag: headers['If-None-Match'] = etag response = get_data(url, headers=headers, params = params) if response is not None: remaining = response.headers.get('X-RateLimit-Remaining') print(f"X-RateLimit-Remaining: {remaining}") ETag = response.headers.get('ETag') print(f"ETag: {ETag}") # Check response status if response.status_code == 200: # Update ETag etag = response.headers.get('ETag') # Get events and add to list new_events = response.json() for event in new_events: event_id = event.get('id') if event_id not in seen_event_ids: events.append(event) seen_event_ids[event_id] = True # Print event count print(f'Event count: {len(events)}') elif response.status_code == 304: print('response.status_code == 304, no new events.') else: print(f'response.status_code is {response.status_code}') # Sleep according to GitHub's X-Poll-Interval header # sleep_time = int(response.headers.get('X-Poll-Interval', 0)) sleep_time = 0 # Check if 'Link' header has 'next' keyword, if not, it means it's the last page if 'next' not in response.links: url = 'https://api.github.com/events' # time.sleep(sleep_time) if len(events)>=700: bulk_load_data(events) events.clear() seen_event_ids = {} else: # Update URL to the next page link url = response.links['next']['url']
Running Tasks
Since this task is executed in a loop, after clicking "Run", the task will reside in the Python node, continuously fetching the latest events from GitHub and writing them into the Lakehouse table until manually canceled.