@@ -719,13 +719,22 @@ def test(self, providers=providers): # default arg to capture loop value
719719 ** yaml_transform .SafeLineLoader .strip_metadata (
720720 fixture .get ('config' , {}))))
721721 for pipeline_spec in spec ['pipelines' ]:
722- with beam .Pipeline (options = PipelineOptions (
723- pickle_library = 'cloudpickle' ,
724- ** replace_recursive (yaml_transform .SafeLineLoader .strip_metadata (
725- pipeline_spec .get ('options' , {})),
726- vars ))) as p :
727- yaml_transform .expand_pipeline (
728- p , replace_recursive (pipeline_spec , vars ))
722+ try :
723+ with beam .Pipeline (options = PipelineOptions (
724+ pickle_library = 'cloudpickle' ,
725+ ** replace_recursive (yaml_transform .SafeLineLoader .strip_metadata (
726+ pipeline_spec .get ('options' , {})),
727+ vars ))) as p :
728+ yaml_transform .expand_pipeline (
729+ p , replace_recursive (pipeline_spec , vars ))
730+ except ValueError as exn :
731+ # FnApiRunner currently does not support this requirement in
732+ # some xlang scenarios (e.g. Iceberg YAML pipelines).
733+ if 'beam:requirement:pardo:on_window_expiration:v1' in str (exn ):
734+ self .skipTest (
735+ 'Runner does not support '
736+ 'beam:requirement:pardo:on_window_expiration:v1' )
737+ raise
729738
730739 yield f'test_{ suffix } ' , test
731740
0 commit comments