-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmain.py
73 lines (52 loc) · 2 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import logging
import pandas_gbq
from google.cloud.bigquery import Client
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def count_apps(df):
"""
This function aggregates apps per location (on prem or cloud) and enviornment (dev or prod)
:param df: pd.DataFrame: The dataframe the function is applied on
:return: df: pd.Dataframe: Table containing apps per date, cluster and prod/dev
"""
df['antall_apper'] = 1
# Extracting information about environment
df['env'] = 'dev'
df.loc[df['cluster'].str.contains('prod'), 'env'] = 'prod'
# Extracting information about the location
df['datacenter'] = 'on_prem'
df.loc[df['cluster'].str.contains('gcp'), 'datacenter'] = 'gcp'
df = df.groupby(['dato', 'env', 'datacenter'])['antall_apper'].count().reset_index()
return df
def load_data(df, project_id, destination_table):
table_schema = [
{'name': 'dato', 'type': 'DATE'},
{'name': 'env', 'type': 'STRING'},
{'name': 'datacenter', 'type': 'STRING'},
{'name': 'antall_apper', 'type': 'INTEGER'}
]
pandas_gbq.to_gbq(
dataframe=df,
destination_table=destination_table,
table_schema=table_schema,
project_id=project_id,
if_exists='replace',
progress_bar=False
)
def run_etl():
project_id = 'nais-analyse-prod-2dcc'
destination_table = f'{project_id}.apps_aggregated.apps_per_env'
source_table = 'aura-prod-d7e3.dataproduct_apps.dataproduct_apps_unique_v3'
client = Client(project=project_id)
# Extract
logging.info('Read data from source...')
query = f'SELECT * FROM `{source_table}`'
df = client.query(query).to_dataframe()
logging.info(f'{len(df)} rows read from source')
# Transform
df = count_apps(df)
# Load
logging.info('Write data to target...')
load_data(df, project_id, destination_table)
logging.info(f'{len(df)} rows written to target')
if __name__ == '__main__':
run_etl()