Skip to content

Commit dfb3f87

Browse files
authored
[yaml] : add jinja inheritance example (#37601)
* add jinja files for inheritance * add support for new example * add license
1 parent b9d48fa commit dfb3f87

5 files changed

Lines changed: 188 additions & 4 deletions

File tree

sdks/python/apache_beam/yaml/examples/testing/examples_test.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -563,8 +563,11 @@ def _wordcount_minimal_test_preprocessor(
563563
return _wordcount_random_shuffler(test_spec, all_words, env)
564564

565565

566-
@YamlExamplesTestSuite.register_test_preprocessor(
567-
['test_wordCountInclude_yaml', 'test_wordCountImport_yaml'])
566+
@YamlExamplesTestSuite.register_test_preprocessor([
567+
'test_wordCountInclude_yaml',
568+
'test_wordCountImport_yaml',
569+
'test_wordCountInheritance_yaml'
570+
])
568571
def _wordcount_jinja_test_preprocessor(
569572
test_spec: dict, expected: List[str], env: TestEnvironment):
570573
"""
@@ -679,6 +682,7 @@ def _kafka_test_preprocessor(
679682
'test_anomaly_scoring_yaml',
680683
'test_wordCountInclude_yaml',
681684
'test_wordCountImport_yaml',
685+
'test_wordCountInheritance_yaml',
682686
'test_iceberg_to_alloydb_yaml'
683687
])
684688
def _io_write_test_preprocessor(
@@ -1256,8 +1260,11 @@ def _batch_log_analysis_test_preprocessor(
12561260
return test_spec
12571261

12581262

1259-
@YamlExamplesTestSuite.register_test_preprocessor(
1260-
['test_wordCountInclude_yaml', 'test_wordCountImport_yaml'])
1263+
@YamlExamplesTestSuite.register_test_preprocessor([
1264+
'test_wordCountInclude_yaml',
1265+
'test_wordCountImport_yaml',
1266+
'test_wordCountInheritance_yaml'
1267+
])
12611268
def _jinja_preprocessor(raw_spec_string: str, test_name: str):
12621269
"""
12631270
Preprocessor for Jinja-based YAML tests.

sdks/python/apache_beam/yaml/examples/testing/input_data.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@ def word_count_jinja_template_data(test_name: str) -> list[str]:
8686
'apache_beam/yaml/examples/transforms/jinja/'
8787
'import/macros/wordCountMacros.yaml'
8888
]
89+
elif test_name == 'test_wordCountInheritance_yaml':
90+
return [
91+
'apache_beam/yaml/examples/transforms/jinja/'
92+
'inheritance/base/base_pipeline.yaml'
93+
]
8994
return []
9095

9196

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
<!--
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
# Jinja Inheritance Example
21+
22+
This folder contains an example of how to use Jinja2 inheritance in Beam YAML pipelines.
23+
24+
## Files
25+
26+
* **base/base_pipeline.yaml**: A complete WordCount pipeline (Read -> Split -> Explode -> Combine -> MapToFields -> Write). It defines a block `extra_steps` between `Explode` and `MapToFields` to allow child pipelines to inject additional transforms.
27+
* **wordCountInheritance.yaml**: Extends `base/base_pipeline.yaml` and injects a `Combine` transform into the `extra_steps` block to combine words.
28+
29+
## Running the Example
30+
31+
To run the child pipeline (which includes the inherited base pipeline logic + the new filter):
32+
33+
General setup:
34+
```sh
35+
export PIPELINE_FILE=apache_beam/yaml/examples/transforms/jinja/inheritance/wordCountInheritance.yaml
36+
export KINGLEAR="gs://dataflow-samples/shakespeare/kinglear.txt"
37+
export TEMP_LOCATION="gs://MY-BUCKET/wordCounts/"
38+
export PROJECT="MY-PROJECT"
39+
export REGION="MY-REGION"
40+
41+
cd <PATH_TO_BEAM_REPO>/beam/sdks/python
42+
```
43+
44+
Multiline Run Example:
45+
```sh
46+
python -m apache_beam.yaml.main \
47+
--project=${PROJECT} \
48+
--region=${REGION} \
49+
--yaml_pipeline_file="${PIPELINE_FILE}" \
50+
--jinja_variables='{
51+
"readFromTextTransform": {"path": "'"${KINGLEAR}"'"},
52+
"mapToFieldsSplitConfig": {
53+
"language": "python",
54+
"fields": {
55+
"value": "1"
56+
}
57+
},
58+
"explodeTransform": {"fields": "word"},
59+
"combineTransform": {
60+
"group_by": "word",
61+
"combine": {"value": "sum"}
62+
},
63+
"mapToFieldsCountConfig": {
64+
"language": "python",
65+
"fields": {"output": "word + \" - \" + str(value)"}
66+
},
67+
"writeToTextTransform": {"path": "'"${TEMP_LOCATION}"'"}
68+
}'
69+
```
70+
71+
Single Line Run Example:
72+
```sh
73+
python -m apache_beam.yaml.main --project=${PROJECT} --region=${REGION} \
74+
--yaml_pipeline_file="${PIPELINE_FILE}" --jinja_variables='{"readFromTextTransform":
75+
{"path": "'"${KINGLEAR}"'"}, "mapToFieldsSplitConfig": {"language": "python", "fields":{"value":"1"}}, "explodeTransform":{"fields":"word"}, "combineTransform":{"group_by":"word", "combine":{"value":"sum"}}, "mapToFieldsCountConfig":{"language": "python", "fields":{"output":"word + \" - \" + str(value)"}}, "writeToTextTransform":{"path":"'"${TEMP_LOCATION}"'"}}'
76+
```
77+
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
pipeline:
19+
type: chain
20+
transforms:
21+
- type: ReadFromText
22+
config:
23+
path: {{readFromTextTransform.path}}
24+
25+
- type: MapToFields
26+
name: Split words
27+
config:
28+
language: python
29+
fields:
30+
word:
31+
callable: |-
32+
import re
33+
def my_mapping(row):
34+
return re.findall(r'[A-Za-z\']+', row.line.lower())
35+
value: {{mapToFieldsSplitConfig.fields.value}}
36+
- type: Explode
37+
config:
38+
fields:
39+
- {{explodeTransform.fields}}
40+
41+
# Inheritance injection point: content added here by child pipelines will be executed
42+
# after Explode and before MapToFields.
43+
{% block extra_steps %}
44+
{% endblock %}
45+
46+
- type: MapToFields
47+
name: Format output
48+
config:
49+
language: {{mapToFieldsCountConfig.language}}
50+
fields:
51+
output: {{mapToFieldsCountConfig.fields.output}}
52+
- name: Write to GCS
53+
type: WriteToText
54+
config:
55+
path: {{writeToTextTransform.path}}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
{% extends "apache_beam/yaml/examples/transforms/jinja/inheritance/base/base_pipeline.yaml" %}
19+
20+
{% block extra_steps %}
21+
- name: Count words
22+
type: Combine
23+
config:
24+
group_by:
25+
- {{combineTransform.group_by}}
26+
combine:
27+
value: {{combineTransform.combine.value}}
28+
{% endblock %}
29+
30+
# Expected:
31+
# Row(output='king - 311')
32+
# Row(output='lear - 253')
33+
# Row(output='dramatis - 1')
34+
# Row(output='personae - 1')
35+
# Row(output='of - 483')
36+
# Row(output='britain - 2')
37+
# Row(output='france - 32')
38+
# Row(output='duke - 26')
39+
# Row(output='burgundy - 20')
40+
# Row(output='cornwall - 75')

0 commit comments

Comments
 (0)