forked from GoogleCloudPlatform/professional-services
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbigquery.py
125 lines (114 loc) · 5.08 KB
/
bigquery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .base import Output, NotConfiguredException
import re
from google.cloud import bigquery
class InvalidJobOptionException(Exception):
pass
class BigqueryOutput(Output):
def output(self):
if 'datasetWithTable' not in self.output_config:
raise NotConfiguredException(
'No destination dataset specified in BigQuery output.')
if 'source' not in self.output_config:
raise NotConfiguredException(
'No GCS source specified in BigQuery output.')
if 'location' not in self.output_config:
raise NotConfiguredException(
'No dataset location specified in BigQuery output.')
if 'job' not in self.output_config:
raise NotConfiguredException(
'No load job location specified in BigQuery output.')
project = self.output_config[
'project'] if 'project' in self.output_config else None
bigquery_client = bigquery.Client(
client_info=self._get_grpc_client_info(), project=project)
job_config = {}
job_field_type = {
'projectionFields': 'list',
'schema': 'dict',
'schemaUpdateOptions': 'list',
'timePartitioning': 'dict',
'rangePartitioning': 'dict',
'clustering': 'dict',
'destinationEncryptionConfiguration': 'dict',
'hivePartitioningOptions': 'dict',
'useAvroLogicalTypes': 'bool',
'allowQuotedNewlines': 'bool',
'allowJaggedRows': 'bool',
'ignoreUnknownValues': 'bool,',
'autodetect': 'bool',
'decimalTargetTypes': 'list',
'parquetOptions': 'dict',
'destinationTableDescription': 'str',
'destinationTableFriendlyName': 'str',
'nullMarker': 'str',
'quoteCharacter': 'str',
'labels': 'dict',
'sourceFormat': 'str',
'encoding': 'str',
'writeDisposition': 'str',
'createDisposition': 'str',
'maxBadRecords': 'int',
'skipLeadingRows': 'int'
}
job_field_map = {}
for camel_name in job_field_type:
snake_name = re.sub(r'(?<!^)(?=[A-Z])', '_', camel_name).lower()
job_field_map[camel_name] = snake_name
if 'job' in self.output_config:
for k, v in self.output_config['job'].items():
if k not in job_field_map:
raise InvalidJobOptionException('Unknown job option "%s"' %
k)
field = job_field_map[k]
if k not in job_field_type or job_field_type[k] == 'str':
job_config[field] = self._jinja_expand_string(v)
elif job_field_type[k] == 'list':
job_config[field] = self._jinja_var_to_list(v)
elif job_field_type[k] == 'dict':
job_config[field] = self._jinja_expand_dict(v)
elif job_field_type[k] == 'bool':
job_config[field] = self._jinja_expand_bool(v)
elif job_field_type[k] == 'int':
job_config[field] = self._jinja_expand_int(v)
bq_job_config = bigquery.job.LoadJobConfig.from_api_repr(
{'load': job_config})
table = self._jinja_expand_string(
self.output_config['datasetWithTable'])
location = self._jinja_expand_string(self.output_config['location'])
source = self._jinja_expand_string(self.output_config['source'])
self.logger.info('BigQuery load job starting...',
extra={
'source_url': source,
'dataset': table,
'location': location,
'job_config': job_config,
})
load_job = bigquery_client.load_table_from_uri(
source,
table,
location=location,
job_config=bq_job_config,
)
load_job.result()
self.logger.info('BigQuery load job finished.',
extra={
'source_url': source,
'dataset': table,
'location': location,
'output_rows': load_job.output_rows,
'output_bytes': load_job.output_bytes,
'errors': load_job.errors,
})