diff --git a/knip.json b/knip.json index 6789a3bf0..70e847bce 100644 --- a/knip.json +++ b/knip.json @@ -5,8 +5,7 @@ "src/api/**", "src/components/ui/**", "src/config/announcements.ts", - "src/components/shared/BetaFeatureWrapper/BetaFeatureWrapper.tsx", - "src/utils/publicAsset.ts" + "src/components/shared/BetaFeatureWrapper/BetaFeatureWrapper.tsx" ], "ignoreDependencies": [ "@tanstack/react-query-devtools", diff --git a/public/example-pipelines/Intro-Data Flow.pipeline.component.png b/public/example-pipelines/Intro-Data Flow.pipeline.component.png new file mode 100644 index 000000000..643913dac Binary files /dev/null and b/public/example-pipelines/Intro-Data Flow.pipeline.component.png differ diff --git a/public/example-pipelines/Intro-Data Flow.pipeline.component.yaml b/public/example-pipelines/Intro-Data Flow.pipeline.component.yaml new file mode 100644 index 000000000..569110164 --- /dev/null +++ b/public/example-pipelines/Intro-Data Flow.pipeline.component.yaml @@ -0,0 +1,908 @@ +name: 'Intro: Simple Pipeline' +description: | + The simplest Tangle pipeline — five tasks in a straight line. Generates a synthetic regression dataset, splits into train/test, trains a linear regression model, predicts on the test set, and evaluates with standard metrics (MAE, RMSE, R²). All components use only Python stdlib — no external dependencies. +metadata: + annotations: + flex-nodes: '[{"id":"note-simple","properties":{"title":"Simple Pipeline","content":"Five tasks wired in sequence via taskOutput references. Each task uses a lightweight Python component with no external dependencies. Data flows left-to-right: dataset → split → model → predictions → metrics.","color":"#E3F2FD"},"metadata":{"createdAt":"2026-05-19T00:00:00.000Z","createdBy":"tangle-examples"},"size":{"width":320,"height":130},"position":{"x":200,"y":-120},"zIndex":0}]' + editor.flow-direction: left-to-right +implementation: + graph: + tasks: + Split: + componentRef: + name: Split csv + digest: 7dbbe3ac41f4e820f0d168ef355ada703716f4593eb5e70664746eebe0fe79e7 + spec: + name: Split csv + description: |- + Split a CSV dataset into train and test sets. + + Randomly shuffles rows, then splits by the given fraction. + Both output files keep the same header row. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: split_csv.component.yaml + python_original_code: | + from cloud_pipelines import components + import csv + import random + + + def split_csv( + input_data: components.InputPath("CSV"), + train_data: components.OutputPath("CSV"), + test_data: components.OutputPath("CSV"), + train_fraction: float = 0.8, + random_seed: int = 42, + ): + """Split a CSV dataset into train and test sets. + + Randomly shuffles rows, then splits by the given fraction. + Both output files keep the same header row. + + Args: + input_data: Input CSV file. + train_data: Output CSV for the training split. + test_data: Output CSV for the test split. + train_fraction: Fraction of rows for training (0.0 to 1.0). + random_seed: Seed for reproducible shuffling. + """ + random.seed(random_seed) + + with open(input_data, "r") as f: + reader = csv.reader(f) + header = next(reader) + rows = list(reader) + + random.shuffle(rows) + split_idx = int(len(rows) * train_fraction) + train_rows = rows[:split_idx] + test_rows = rows[split_idx:] + + for path, subset in [(train_data, train_rows), (test_data, test_rows)]: + with open(path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(header) + writer.writerows(subset) + + print(f"Split {len(rows)} rows -> {len(train_rows)} train, {len(test_rows)} test") + python_original_code_path: split_csv.py + components new regenerate python-function-component: 'true' + inputs: + - name: input_data + type: CSV + description: Input CSV file. + - name: train_fraction + type: Float + description: Fraction of rows for training (0.0 to 1.0). + default: '0.8' + optional: true + - name: random_seed + type: Integer + description: Seed for reproducible shuffling. + default: '42' + optional: true + outputs: + - name: train_data + type: CSV + description: Output CSV for the training split. + - name: test_data + type: CSV + description: Output CSV for the test split. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import csv + import random + + def split_csv( + input_data, + train_data, + test_data, + train_fraction = 0.8, + random_seed = 42, + ): + """Split a CSV dataset into train and test sets. + + Randomly shuffles rows, then splits by the given fraction. + Both output files keep the same header row. + + Args: + input_data: Input CSV file. + train_data: Output CSV for the training split. + test_data: Output CSV for the test split. + train_fraction: Fraction of rows for training (0.0 to 1.0). + random_seed: Seed for reproducible shuffling. + """ + random.seed(random_seed) + + with open(input_data, "r") as f: + reader = csv.reader(f) + header = next(reader) + rows = list(reader) + + random.shuffle(rows) + split_idx = int(len(rows) * train_fraction) + train_rows = rows[:split_idx] + test_rows = rows[split_idx:] + + for path, subset in [(train_data, train_rows), (test_data, test_rows)]: + with open(path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(header) + writer.writerows(subset) + + print(f"Split {len(rows)} rows -> {len(train_rows)} train, {len(test_rows)} test") + + import argparse + _parser = argparse.ArgumentParser(prog='Split csv', description='Split a CSV dataset into train and test sets.\n\nRandomly shuffles rows, then splits by the given fraction.\nBoth output files keep the same header row.') + _parser.add_argument("--input-data", dest="input_data", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--train-fraction", dest="train_fraction", type=float, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--random-seed", dest="random_seed", type=int, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--train-data", dest="train_data", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--test-data", dest="test_data", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = split_csv(**_parsed_args) + args: + - '--input-data' + - inputPath: input_data + - if: + cond: + isPresent: train_fraction + then: + - '--train-fraction' + - inputValue: train_fraction + - if: + cond: + isPresent: random_seed + then: + - '--random-seed' + - inputValue: random_seed + - '--train-data' + - outputPath: train_data + - '--test-data' + - outputPath: test_data + arguments: + input_data: + taskOutput: + outputName: output_data + taskId: Generate Data + train_fraction: '0.8' + annotations: + editor.position: '{"x": 350, "y": 100}' + Train: + componentRef: + name: Train regression + digest: e4292a5974ba0c989f95fff77d993e75eb9c6b26ebe23d8df775f804d22309f0 + spec: + name: Train regression + description: |- + Train a simple linear regression model using ordinary least squares. + + Fits weights and bias to minimise squared error. Uses only Python stdlib + (no numpy/sklearn). The trained model is saved as a JSON file containing + the weight vector, bias, feature names, and training metrics. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: train_regression.component.yaml + python_original_code: | + from cloud_pipelines import components + import csv + import json + + + def train_regression( + training_data: components.InputPath("CSV"), + model: components.OutputPath("JSON"), + target_column: str = "target", + ): + """Train a simple linear regression model using ordinary least squares. + + Fits weights and bias to minimise squared error. Uses only Python stdlib + (no numpy/sklearn). The trained model is saved as a JSON file containing + the weight vector, bias, feature names, and training metrics. + + Args: + training_data: Input CSV with feature columns and a target column. + model: Output JSON file with trained model parameters. + target_column: Name of the column to predict. + """ + with open(training_data, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + feature_names = [k for k in rows[0].keys() if k != target_column] + n = len(rows) + p = len(feature_names) + + X = [[float(row[f]) for f in feature_names] for row in rows] + y = [float(row[target_column]) for row in rows] + + # Add bias column (column of 1s) + X_aug = [[1.0] + row for row in X] + cols = p + 1 + + # Normal equation: (X^T X)^-1 X^T y + # Compute X^T X + XtX = [[0.0] * cols for _ in range(cols)] + for i in range(cols): + for j in range(cols): + XtX[i][j] = sum(X_aug[k][i] * X_aug[k][j] for k in range(n)) + + # Compute X^T y + Xty = [sum(X_aug[k][i] * y[k] for k in range(n)) for i in range(cols)] + + # Solve via Gaussian elimination + aug = [XtX[i][:] + [Xty[i]] for i in range(cols)] + for i in range(cols): + max_row = max(range(i, cols), key=lambda r: abs(aug[r][i])) + aug[i], aug[max_row] = aug[max_row], aug[i] + pivot = aug[i][i] + if abs(pivot) < 1e-12: + continue + for j in range(i, cols + 1): + aug[i][j] /= pivot + for r in range(cols): + if r != i: + factor = aug[r][i] + for j in range(i, cols + 1): + aug[r][j] -= factor * aug[i][j] + + params = [aug[i][cols] for i in range(cols)] + bias = params[0] + weights = params[1:] + + # Training RMSE + predictions = [bias + sum(w * x for w, x in zip(weights, row)) for row in X] + mse = sum((p - a) ** 2 for p, a in zip(predictions, y)) / n + rmse = mse ** 0.5 + + model_data = { + "feature_names": feature_names, + "weights": [round(w, 6) for w in weights], + "bias": round(bias, 6), + "training_rmse": round(rmse, 6), + "training_rows": n, + } + + with open(model, "w") as f: + json.dump(model_data, f, indent=2) + + print(f"Trained on {n} rows, {p} features") + print(f"Weights: {dict(zip(feature_names, weights))}") + print(f"Bias: {bias:.4f}, Training RMSE: {rmse:.4f}") + python_original_code_path: train_regression.py + components new regenerate python-function-component: 'true' + inputs: + - name: training_data + type: CSV + description: Input CSV with feature columns and a target column. + - name: target_column + type: String + description: Name of the column to predict. + default: target + optional: true + outputs: + - name: model + type: JSON + description: Output JSON file with trained model parameters. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import csv + import json + + def train_regression( + training_data, + model, + target_column = "target", + ): + """Train a simple linear regression model using ordinary least squares. + + Fits weights and bias to minimise squared error. Uses only Python stdlib + (no numpy/sklearn). The trained model is saved as a JSON file containing + the weight vector, bias, feature names, and training metrics. + + Args: + training_data: Input CSV with feature columns and a target column. + model: Output JSON file with trained model parameters. + target_column: Name of the column to predict. + """ + with open(training_data, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + feature_names = [k for k in rows[0].keys() if k != target_column] + n = len(rows) + p = len(feature_names) + + X = [[float(row[f]) for f in feature_names] for row in rows] + y = [float(row[target_column]) for row in rows] + + # Add bias column (column of 1s) + X_aug = [[1.0] + row for row in X] + cols = p + 1 + + # Normal equation: (X^T X)^-1 X^T y + # Compute X^T X + XtX = [[0.0] * cols for _ in range(cols)] + for i in range(cols): + for j in range(cols): + XtX[i][j] = sum(X_aug[k][i] * X_aug[k][j] for k in range(n)) + + # Compute X^T y + Xty = [sum(X_aug[k][i] * y[k] for k in range(n)) for i in range(cols)] + + # Solve via Gaussian elimination + aug = [XtX[i][:] + [Xty[i]] for i in range(cols)] + for i in range(cols): + max_row = max(range(i, cols), key=lambda r: abs(aug[r][i])) + aug[i], aug[max_row] = aug[max_row], aug[i] + pivot = aug[i][i] + if abs(pivot) < 1e-12: + continue + for j in range(i, cols + 1): + aug[i][j] /= pivot + for r in range(cols): + if r != i: + factor = aug[r][i] + for j in range(i, cols + 1): + aug[r][j] -= factor * aug[i][j] + + params = [aug[i][cols] for i in range(cols)] + bias = params[0] + weights = params[1:] + + # Training RMSE + predictions = [bias + sum(w * x for w, x in zip(weights, row)) for row in X] + mse = sum((p - a) ** 2 for p, a in zip(predictions, y)) / n + rmse = mse ** 0.5 + + model_data = { + "feature_names": feature_names, + "weights": [round(w, 6) for w in weights], + "bias": round(bias, 6), + "training_rmse": round(rmse, 6), + "training_rows": n, + } + + with open(model, "w") as f: + json.dump(model_data, f, indent=2) + + print(f"Trained on {n} rows, {p} features") + print(f"Weights: {dict(zip(feature_names, weights))}") + print(f"Bias: {bias:.4f}, Training RMSE: {rmse:.4f}") + + import argparse + _parser = argparse.ArgumentParser(prog='Train regression', description='Train a simple linear regression model using ordinary least squares.\n\nFits weights and bias to minimise squared error. Uses only Python stdlib\n(no numpy/sklearn). The trained model is saved as a JSON file containing\nthe weight vector, bias, feature names, and training metrics.') + _parser.add_argument("--training-data", dest="training_data", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--target-column", dest="target_column", type=str, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--model", dest="model", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = train_regression(**_parsed_args) + args: + - '--training-data' + - inputPath: training_data + - if: + cond: + isPresent: target_column + then: + - '--target-column' + - inputValue: target_column + - '--model' + - outputPath: model + arguments: + target_column: target + training_data: + taskOutput: + outputName: train_data + taskId: Split + annotations: + editor.position: '{"x": 700, "y": 100}' + Predict: + componentRef: + name: Predict + digest: 4841c31fc75f2d26a5a7d3123d6fa6fc6b43d8badd549ad3ac3e20119860938d + spec: + name: Predict + description: |- + Apply a trained linear regression model to produce predictions. + + Reads the model JSON (weights + bias) and the test CSV, computes + predicted values, and writes a CSV with columns: actual, predicted. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: predict.component.yaml + python_original_code: | + from cloud_pipelines import components + import csv + import json + + + def predict( + test_data: components.InputPath("CSV"), + model: components.InputPath("JSON"), + predictions: components.OutputPath("CSV"), + ): + """Apply a trained linear regression model to produce predictions. + + Reads the model JSON (weights + bias) and the test CSV, computes + predicted values, and writes a CSV with columns: actual, predicted. + + Args: + test_data: Input CSV with the same feature columns used in training. + model: Trained model JSON (from train_regression). + predictions: Output CSV with actual and predicted values. + """ + with open(model, "r") as f: + model_data = json.load(f) + + feature_names = model_data["feature_names"] + weights = model_data["weights"] + bias = model_data["bias"] + + with open(test_data, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + target_col = [c for c in rows[0].keys() if c not in feature_names][0] + + with open(predictions, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["actual", "predicted"]) + for row in rows: + features = [float(row[feat]) for feat in feature_names] + pred = bias + sum(w * x for w, x in zip(weights, features)) + actual = float(row[target_col]) + writer.writerow([round(actual, 4), round(pred, 4)]) + + print(f"Generated {len(rows)} predictions") + python_original_code_path: predict.py + components new regenerate python-function-component: 'true' + inputs: + - name: test_data + type: CSV + description: Input CSV with the same feature columns used in training. + - name: model + type: JSON + description: Trained model JSON (from train_regression). + outputs: + - name: predictions + type: CSV + description: Output CSV with actual and predicted values. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import csv + import json + + def predict( + test_data, + model, + predictions, + ): + """Apply a trained linear regression model to produce predictions. + + Reads the model JSON (weights + bias) and the test CSV, computes + predicted values, and writes a CSV with columns: actual, predicted. + + Args: + test_data: Input CSV with the same feature columns used in training. + model: Trained model JSON (from train_regression). + predictions: Output CSV with actual and predicted values. + """ + with open(model, "r") as f: + model_data = json.load(f) + + feature_names = model_data["feature_names"] + weights = model_data["weights"] + bias = model_data["bias"] + + with open(test_data, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + target_col = [c for c in rows[0].keys() if c not in feature_names][0] + + with open(predictions, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["actual", "predicted"]) + for row in rows: + features = [float(row[feat]) for feat in feature_names] + pred = bias + sum(w * x for w, x in zip(weights, features)) + actual = float(row[target_col]) + writer.writerow([round(actual, 4), round(pred, 4)]) + + print(f"Generated {len(rows)} predictions") + + import argparse + _parser = argparse.ArgumentParser(prog='Predict', description='Apply a trained linear regression model to produce predictions.\n\nReads the model JSON (weights + bias) and the test CSV, computes\npredicted values, and writes a CSV with columns: actual, predicted.') + _parser.add_argument("--test-data", dest="test_data", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--model", dest="model", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--predictions", dest="predictions", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = predict(**_parsed_args) + args: + - '--test-data' + - inputPath: test_data + - '--model' + - inputPath: model + - '--predictions' + - outputPath: predictions + arguments: + model: + taskOutput: + outputName: model + taskId: Train + test_data: + taskOutput: + outputName: test_data + taskId: Split + annotations: + editor.position: '{"x": 1050, "y": 100}' + Evaluate: + componentRef: + name: Evaluate + digest: c26e9e058d298c1c57dd96e15ea4261b99439a53fa9b323db4e9ef783933954c + spec: + name: Evaluate + description: |- + Compute regression metrics from a predictions CSV. + + Expects columns: actual, predicted. Outputs a JSON file with + MAE, MSE, RMSE, R-squared, and row count. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: evaluate.component.yaml + python_original_code: | + from cloud_pipelines import components + import csv + import json + import math + + + def evaluate( + predictions: components.InputPath("CSV"), + metrics: components.OutputPath("JSON"), + ): + """Compute regression metrics from a predictions CSV. + + Expects columns: actual, predicted. Outputs a JSON file with + MAE, MSE, RMSE, R-squared, and row count. + + Args: + predictions: Input CSV with actual and predicted columns. + metrics: Output JSON with computed regression metrics. + """ + with open(predictions, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + actuals = [float(r["actual"]) for r in rows] + preds = [float(r["predicted"]) for r in rows] + n = len(actuals) + + errors = [a - p for a, p in zip(actuals, preds)] + abs_errors = [abs(e) for e in errors] + sq_errors = [e ** 2 for e in errors] + + mae = sum(abs_errors) / n + mse = sum(sq_errors) / n + rmse = math.sqrt(mse) + + mean_actual = sum(actuals) / n + ss_tot = sum((a - mean_actual) ** 2 for a in actuals) + ss_res = sum(sq_errors) + r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0.0 + + result = { + "num_samples": n, + "mae": round(mae, 6), + "mse": round(mse, 6), + "rmse": round(rmse, 6), + "r_squared": round(r_squared, 6), + "max_error": round(max(abs_errors), 6), + } + + with open(metrics, "w") as f: + json.dump(result, f, indent=2) + + print(f"Evaluated {n} predictions:") + for k, v in result.items(): + print(f" {k}: {v}") + python_original_code_path: evaluate.py + components new regenerate python-function-component: 'true' + inputs: + - name: predictions + type: CSV + description: Input CSV with actual and predicted columns. + outputs: + - name: metrics + type: JSON + description: Output JSON with computed regression metrics. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import csv + import json + import math + + def evaluate( + predictions, + metrics, + ): + """Compute regression metrics from a predictions CSV. + + Expects columns: actual, predicted. Outputs a JSON file with + MAE, MSE, RMSE, R-squared, and row count. + + Args: + predictions: Input CSV with actual and predicted columns. + metrics: Output JSON with computed regression metrics. + """ + with open(predictions, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + actuals = [float(r["actual"]) for r in rows] + preds = [float(r["predicted"]) for r in rows] + n = len(actuals) + + errors = [a - p for a, p in zip(actuals, preds)] + abs_errors = [abs(e) for e in errors] + sq_errors = [e ** 2 for e in errors] + + mae = sum(abs_errors) / n + mse = sum(sq_errors) / n + rmse = math.sqrt(mse) + + mean_actual = sum(actuals) / n + ss_tot = sum((a - mean_actual) ** 2 for a in actuals) + ss_res = sum(sq_errors) + r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0.0 + + result = { + "num_samples": n, + "mae": round(mae, 6), + "mse": round(mse, 6), + "rmse": round(rmse, 6), + "r_squared": round(r_squared, 6), + "max_error": round(max(abs_errors), 6), + } + + with open(metrics, "w") as f: + json.dump(result, f, indent=2) + + print(f"Evaluated {n} predictions:") + for k, v in result.items(): + print(f" {k}: {v}") + + import argparse + _parser = argparse.ArgumentParser(prog='Evaluate', description='Compute regression metrics from a predictions CSV.\n\nExpects columns: actual, predicted. Outputs a JSON file with\nMAE, MSE, RMSE, R-squared, and row count.') + _parser.add_argument("--predictions", dest="predictions", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--metrics", dest="metrics", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = evaluate(**_parsed_args) + args: + - '--predictions' + - inputPath: predictions + - '--metrics' + - outputPath: metrics + arguments: + predictions: + taskOutput: + outputName: predictions + taskId: Predict + annotations: + editor.position: '{"x": 1400, "y": 100}' + Generate Data: + componentRef: + name: Generate dataset + digest: 7f837011088acc8e081f5f2ae5c981cc3bb73ed28bf4b2aea3134bc5297e1674 + spec: + name: Generate dataset + description: |- + Generate a synthetic regression dataset with 4 features and a target. + + Creates a CSV with columns: feature_1, feature_2, feature_3, feature_4, target. + The target is a noisy linear combination of the features, suitable for + demonstrating regression workflows. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: generate_dataset.component.yaml + python_original_code: | + from cloud_pipelines import components + import random + import csv + import math + + + def generate_dataset( + output_data: components.OutputPath("CSV"), + num_rows: int = 500, + random_seed: int = 42, + ): + """Generate a synthetic regression dataset with 4 features and a target. + + Creates a CSV with columns: feature_1, feature_2, feature_3, feature_4, target. + The target is a noisy linear combination of the features, suitable for + demonstrating regression workflows. + + Args: + output_data: Output CSV file path. + num_rows: Number of rows to generate. + random_seed: Seed for reproducibility. + """ + random.seed(random_seed) + + weights = [1.5, -2.0, 0.8, 3.2] + bias = 5.0 + noise_scale = 0.5 + + with open(output_data, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["feature_1", "feature_2", "feature_3", "feature_4", "target"]) + + for _ in range(num_rows): + features = [random.gauss(0, 1) for _ in range(4)] + target = bias + sum(w * x for w, x in zip(weights, features)) + target += random.gauss(0, noise_scale) + writer.writerow([round(x, 4) for x in features] + [round(target, 4)]) + python_original_code_path: generate_dataset.py + components new regenerate python-function-component: 'true' + inputs: + - name: num_rows + type: Integer + description: Number of rows to generate. + default: '500' + optional: true + - name: random_seed + type: Integer + description: Seed for reproducibility. + default: '42' + optional: true + outputs: + - name: output_data + type: CSV + description: Output CSV file path. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import random + import csv + import math + + def generate_dataset( + output_data, + num_rows = 500, + random_seed = 42, + ): + """Generate a synthetic regression dataset with 4 features and a target. + + Creates a CSV with columns: feature_1, feature_2, feature_3, feature_4, target. + The target is a noisy linear combination of the features, suitable for + demonstrating regression workflows. + + Args: + output_data: Output CSV file path. + num_rows: Number of rows to generate. + random_seed: Seed for reproducibility. + """ + random.seed(random_seed) + + weights = [1.5, -2.0, 0.8, 3.2] + bias = 5.0 + noise_scale = 0.5 + + with open(output_data, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["feature_1", "feature_2", "feature_3", "feature_4", "target"]) + + for _ in range(num_rows): + features = [random.gauss(0, 1) for _ in range(4)] + target = bias + sum(w * x for w, x in zip(weights, features)) + target += random.gauss(0, noise_scale) + writer.writerow([round(x, 4) for x in features] + [round(target, 4)]) + + import argparse + _parser = argparse.ArgumentParser(prog='Generate dataset', description='Generate a synthetic regression dataset with 4 features and a target.\n\nCreates a CSV with columns: feature_1, feature_2, feature_3, feature_4, target.\nThe target is a noisy linear combination of the features, suitable for\ndemonstrating regression workflows.') + _parser.add_argument("--num-rows", dest="num_rows", type=int, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--random-seed", dest="random_seed", type=int, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--output-data", dest="output_data", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = generate_dataset(**_parsed_args) + args: + - if: + cond: + isPresent: num_rows + then: + - '--num-rows' + - inputValue: num_rows + - if: + cond: + isPresent: random_seed + then: + - '--random-seed' + - inputValue: random_seed + - '--output-data' + - outputPath: output_data + arguments: + num_rows: '500' + random_seed: '42' + annotations: + editor.position: '{"x": 0, "y": 100}' diff --git a/public/example-pipelines/Intro-Hello World.pipeline.component.png b/public/example-pipelines/Intro-Hello World.pipeline.component.png new file mode 100644 index 000000000..a2e3f55a8 Binary files /dev/null and b/public/example-pipelines/Intro-Hello World.pipeline.component.png differ diff --git a/public/example-pipelines/Intro-Hello World.pipeline.component.yaml b/public/example-pipelines/Intro-Hello World.pipeline.component.yaml new file mode 100644 index 000000000..53ee193db --- /dev/null +++ b/public/example-pipelines/Intro-Hello World.pipeline.component.yaml @@ -0,0 +1,138 @@ +name: Hello World +description: A simple pipeline that demonstrates the hello world component. +inputs: + - name: name + type: String + description: The name to greet. + default: '' + annotations: + editor.position: '{"x":140,"y":0}' + value: '' + optional: false +outputs: + - name: greeting + type: Text + description: The generated greeting message. + annotations: + editor.position: '{"x": 850, "y": 0}' +implementation: + graph: + tasks: + Greet: + componentRef: + name: Hello world + digest: 8cf1bcc31ce9cd9be6982a7ba95cf233f23efcb7769842b6b03e917ad5dfdd0a + spec: + name: Hello world + description: A simple hello world component that generates a greeting. + metadata: + annotations: + git_local_sha: 674a9b6ea9fbea311b91860128047d8644a2086e + git_local_branch: js-experiment_with_tangle_deploy_mcp-1209 + git_relative_dir: oasis/generated/jordan_stern + cloud_pipelines.net: 'true' + component_yaml_path: hello_world.component.yaml + python_original_code: | + from cloud_pipelines import components + + + def hello_world( + name: str, + greeting_output: components.OutputPath("Text"), + greeting_prefix: str = "Hello", + ): + """A simple hello world component that generates a greeting. + + Args: + name: The name to greet. + greeting_output: Output file containing the greeting message. + greeting_prefix: Prefix for the greeting (default: Hello). + """ + greeting = f"{greeting_prefix}, {name}! Welcome to Tangle." + print(greeting) + with open(greeting_output, "w") as f: + f.write(greeting) + python_original_code_path: ../../modules/jordan_stern/hello_world.py + components new regenerate python-function-component: 'true' + inputs: + - name: name + type: String + description: The name to greet. + - name: greeting_prefix + type: String + description: 'Prefix for the greeting (default: Hello).' + default: Hello + optional: true + outputs: + - name: greeting_output + type: Text + description: Output file containing the greeting message. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + def hello_world( + name, + greeting_output, + greeting_prefix = "Hello", + ): + """A simple hello world component that generates a greeting. + + Args: + name: The name to greet. + greeting_output: Output file containing the greeting message. + greeting_prefix: Prefix for the greeting (default: Hello). + """ + greeting = f"{greeting_prefix}, {name}! Welcome to Tangle." + print(greeting) + with open(greeting_output, "w") as f: + f.write(greeting) + + import argparse + _parser = argparse.ArgumentParser(prog='Hello world', description='A simple hello world component that generates a greeting.') + _parser.add_argument("--name", dest="name", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--greeting-prefix", dest="greeting_prefix", type=str, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--greeting-output", dest="greeting_output", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = hello_world(**_parsed_args) + args: + - '--name' + - inputValue: name + - if: + cond: + isPresent: greeting_prefix + then: + - '--greeting-prefix' + - inputValue: greeting_prefix + - '--greeting-output' + - outputPath: greeting_output + arguments: + name: + graphInput: + inputName: name + annotations: + editor.position: '{"x":430,"y":0}' + editor.collapsed: 'true' + outputValues: + greeting: + taskOutput: + outputName: greeting_output + taskId: Greet +metadata: + annotations: + editor.flow-direction: left-to-right + notes: Enter a name in the Input Node or configure it when submitting the pipeline. diff --git a/public/example-pipelines/Intro-Input and Output Nodes.pipeline.component.png b/public/example-pipelines/Intro-Input and Output Nodes.pipeline.component.png new file mode 100644 index 000000000..9ad8ca831 Binary files /dev/null and b/public/example-pipelines/Intro-Input and Output Nodes.pipeline.component.png differ diff --git a/public/example-pipelines/Intro-Input and Output Nodes.pipeline.component.yaml b/public/example-pipelines/Intro-Input and Output Nodes.pipeline.component.yaml new file mode 100644 index 000000000..5863a12f2 --- /dev/null +++ b/public/example-pipelines/Intro-Input and Output Nodes.pipeline.component.yaml @@ -0,0 +1,977 @@ +name: 'Intro: Input & Output Nodes' +description: | + Pipeline-level inputs and outputs. Inputs parameterise the pipeline so the same YAML can be re-run with different settings (dataset size, train/test split ratio, target column). Outputs expose the trained model and evaluation metrics at the pipeline boundary. +metadata: + annotations: + flex-nodes: '[{"id":"note-inputs","properties":{"title":"Pipeline Inputs","content":"Inputs appear as configurable parameters when launching a run. Tasks consume them via graphInput wiring — change the input values, change the behaviour, same pipeline.","color":"#E8F5E9"},"metadata":{"createdAt":"2026-05-19T00:00:00.000Z","createdBy":"tangle-examples"},"size":{"width":250,"height":130},"position":{"x":0,"y":-150},"zIndex":0},{"id":"note-outputs","properties":{"title":"Pipeline Outputs","content":"outputValues wire task results to the pipeline boundary. Downstream pipelines or the UI can read them directly without digging into task internals.","color":"#FFF3E0"},"metadata":{"createdAt":"2026-05-19T00:00:00.000Z","createdBy":"tangle-examples"},"size":{"width":250,"height":130},"position":{"x":1500,"y":-150},"zIndex":0}]' + editor.flow-direction: left-to-right +inputs: + - name: Num Rows + type: Integer + description: Number of data points to generate + default: '500' + annotations: + editor.position: '{"x":17.5,"y":320}' + value: '500' + - name: Train Fraction + type: Float + description: Fraction of data for training (remainder is test) + default: '0.8' + annotations: + editor.position: '{"x":360,"y":0}' + value: '0.8' + - name: Target Column + type: String + description: Name of the column to predict + default: target + annotations: + editor.position: '{"x":763.5,"y":525.75}' + value: target + - name: Random Seed + type: Integer + description: Seed for reproducible data generation and splitting + default: '42' + annotations: + editor.position: '{"x":5,"y":120}' + value: '42' +outputs: + - name: Trained Model + type: JSON + description: Model parameters (weights, bias) as JSON + annotations: + editor.position: '{"x":1500,"y":498.75}' + - name: Predictions + type: CSV + description: Actual vs predicted values + annotations: + editor.position: '{"x":1915.5,"y":376.5}' + - name: Metrics + type: JSON + description: Regression metrics — MAE, RMSE, R², max error + annotations: + editor.position: '{"x":2230,"y":168}' +implementation: + graph: + tasks: + Split: + componentRef: + name: Split csv + digest: 7dbbe3ac41f4e820f0d168ef355ada703716f4593eb5e70664746eebe0fe79e7 + spec: + name: Split csv + description: |- + Split a CSV dataset into train and test sets. + + Randomly shuffles rows, then splits by the given fraction. + Both output files keep the same header row. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: split_csv.component.yaml + python_original_code: | + from cloud_pipelines import components + import csv + import random + + + def split_csv( + input_data: components.InputPath("CSV"), + train_data: components.OutputPath("CSV"), + test_data: components.OutputPath("CSV"), + train_fraction: float = 0.8, + random_seed: int = 42, + ): + """Split a CSV dataset into train and test sets. + + Randomly shuffles rows, then splits by the given fraction. + Both output files keep the same header row. + + Args: + input_data: Input CSV file. + train_data: Output CSV for the training split. + test_data: Output CSV for the test split. + train_fraction: Fraction of rows for training (0.0 to 1.0). + random_seed: Seed for reproducible shuffling. + """ + random.seed(random_seed) + + with open(input_data, "r") as f: + reader = csv.reader(f) + header = next(reader) + rows = list(reader) + + random.shuffle(rows) + split_idx = int(len(rows) * train_fraction) + train_rows = rows[:split_idx] + test_rows = rows[split_idx:] + + for path, subset in [(train_data, train_rows), (test_data, test_rows)]: + with open(path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(header) + writer.writerows(subset) + + print(f"Split {len(rows)} rows -> {len(train_rows)} train, {len(test_rows)} test") + python_original_code_path: split_csv.py + components new regenerate python-function-component: 'true' + inputs: + - name: input_data + type: CSV + description: Input CSV file. + - name: train_fraction + type: Float + description: Fraction of rows for training (0.0 to 1.0). + default: '0.8' + optional: true + - name: random_seed + type: Integer + description: Seed for reproducible shuffling. + default: '42' + optional: true + outputs: + - name: train_data + type: CSV + description: Output CSV for the training split. + - name: test_data + type: CSV + description: Output CSV for the test split. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import csv + import random + + def split_csv( + input_data, + train_data, + test_data, + train_fraction = 0.8, + random_seed = 42, + ): + """Split a CSV dataset into train and test sets. + + Randomly shuffles rows, then splits by the given fraction. + Both output files keep the same header row. + + Args: + input_data: Input CSV file. + train_data: Output CSV for the training split. + test_data: Output CSV for the test split. + train_fraction: Fraction of rows for training (0.0 to 1.0). + random_seed: Seed for reproducible shuffling. + """ + random.seed(random_seed) + + with open(input_data, "r") as f: + reader = csv.reader(f) + header = next(reader) + rows = list(reader) + + random.shuffle(rows) + split_idx = int(len(rows) * train_fraction) + train_rows = rows[:split_idx] + test_rows = rows[split_idx:] + + for path, subset in [(train_data, train_rows), (test_data, test_rows)]: + with open(path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(header) + writer.writerows(subset) + + print(f"Split {len(rows)} rows -> {len(train_rows)} train, {len(test_rows)} test") + + import argparse + _parser = argparse.ArgumentParser(prog='Split csv', description='Split a CSV dataset into train and test sets.\n\nRandomly shuffles rows, then splits by the given fraction.\nBoth output files keep the same header row.') + _parser.add_argument("--input-data", dest="input_data", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--train-fraction", dest="train_fraction", type=float, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--random-seed", dest="random_seed", type=int, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--train-data", dest="train_data", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--test-data", dest="test_data", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = split_csv(**_parsed_args) + args: + - '--input-data' + - inputPath: input_data + - if: + cond: + isPresent: train_fraction + then: + - '--train-fraction' + - inputValue: train_fraction + - if: + cond: + isPresent: random_seed + then: + - '--random-seed' + - inputValue: random_seed + - '--train-data' + - outputPath: train_data + - '--test-data' + - outputPath: test_data + arguments: + input_data: + taskOutput: + outputName: output_data + taskId: Generate Data + random_seed: + graphInput: + inputName: Random Seed + train_fraction: + graphInput: + inputName: Train Fraction + annotations: + editor.position: '{"x":710,"y":53.5}' + Train: + componentRef: + name: Train regression + digest: e4292a5974ba0c989f95fff77d993e75eb9c6b26ebe23d8df775f804d22309f0 + spec: + name: Train regression + description: |- + Train a simple linear regression model using ordinary least squares. + + Fits weights and bias to minimise squared error. Uses only Python stdlib + (no numpy/sklearn). The trained model is saved as a JSON file containing + the weight vector, bias, feature names, and training metrics. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: train_regression.component.yaml + python_original_code: | + from cloud_pipelines import components + import csv + import json + + + def train_regression( + training_data: components.InputPath("CSV"), + model: components.OutputPath("JSON"), + target_column: str = "target", + ): + """Train a simple linear regression model using ordinary least squares. + + Fits weights and bias to minimise squared error. Uses only Python stdlib + (no numpy/sklearn). The trained model is saved as a JSON file containing + the weight vector, bias, feature names, and training metrics. + + Args: + training_data: Input CSV with feature columns and a target column. + model: Output JSON file with trained model parameters. + target_column: Name of the column to predict. + """ + with open(training_data, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + feature_names = [k for k in rows[0].keys() if k != target_column] + n = len(rows) + p = len(feature_names) + + X = [[float(row[f]) for f in feature_names] for row in rows] + y = [float(row[target_column]) for row in rows] + + # Add bias column (column of 1s) + X_aug = [[1.0] + row for row in X] + cols = p + 1 + + # Normal equation: (X^T X)^-1 X^T y + # Compute X^T X + XtX = [[0.0] * cols for _ in range(cols)] + for i in range(cols): + for j in range(cols): + XtX[i][j] = sum(X_aug[k][i] * X_aug[k][j] for k in range(n)) + + # Compute X^T y + Xty = [sum(X_aug[k][i] * y[k] for k in range(n)) for i in range(cols)] + + # Solve via Gaussian elimination + aug = [XtX[i][:] + [Xty[i]] for i in range(cols)] + for i in range(cols): + max_row = max(range(i, cols), key=lambda r: abs(aug[r][i])) + aug[i], aug[max_row] = aug[max_row], aug[i] + pivot = aug[i][i] + if abs(pivot) < 1e-12: + continue + for j in range(i, cols + 1): + aug[i][j] /= pivot + for r in range(cols): + if r != i: + factor = aug[r][i] + for j in range(i, cols + 1): + aug[r][j] -= factor * aug[i][j] + + params = [aug[i][cols] for i in range(cols)] + bias = params[0] + weights = params[1:] + + # Training RMSE + predictions = [bias + sum(w * x for w, x in zip(weights, row)) for row in X] + mse = sum((p - a) ** 2 for p, a in zip(predictions, y)) / n + rmse = mse ** 0.5 + + model_data = { + "feature_names": feature_names, + "weights": [round(w, 6) for w in weights], + "bias": round(bias, 6), + "training_rmse": round(rmse, 6), + "training_rows": n, + } + + with open(model, "w") as f: + json.dump(model_data, f, indent=2) + + print(f"Trained on {n} rows, {p} features") + print(f"Weights: {dict(zip(feature_names, weights))}") + print(f"Bias: {bias:.4f}, Training RMSE: {rmse:.4f}") + python_original_code_path: train_regression.py + components new regenerate python-function-component: 'true' + inputs: + - name: training_data + type: CSV + description: Input CSV with feature columns and a target column. + - name: target_column + type: String + description: Name of the column to predict. + default: target + optional: true + outputs: + - name: model + type: JSON + description: Output JSON file with trained model parameters. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import csv + import json + + def train_regression( + training_data, + model, + target_column = "target", + ): + """Train a simple linear regression model using ordinary least squares. + + Fits weights and bias to minimise squared error. Uses only Python stdlib + (no numpy/sklearn). The trained model is saved as a JSON file containing + the weight vector, bias, feature names, and training metrics. + + Args: + training_data: Input CSV with feature columns and a target column. + model: Output JSON file with trained model parameters. + target_column: Name of the column to predict. + """ + with open(training_data, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + feature_names = [k for k in rows[0].keys() if k != target_column] + n = len(rows) + p = len(feature_names) + + X = [[float(row[f]) for f in feature_names] for row in rows] + y = [float(row[target_column]) for row in rows] + + # Add bias column (column of 1s) + X_aug = [[1.0] + row for row in X] + cols = p + 1 + + # Normal equation: (X^T X)^-1 X^T y + # Compute X^T X + XtX = [[0.0] * cols for _ in range(cols)] + for i in range(cols): + for j in range(cols): + XtX[i][j] = sum(X_aug[k][i] * X_aug[k][j] for k in range(n)) + + # Compute X^T y + Xty = [sum(X_aug[k][i] * y[k] for k in range(n)) for i in range(cols)] + + # Solve via Gaussian elimination + aug = [XtX[i][:] + [Xty[i]] for i in range(cols)] + for i in range(cols): + max_row = max(range(i, cols), key=lambda r: abs(aug[r][i])) + aug[i], aug[max_row] = aug[max_row], aug[i] + pivot = aug[i][i] + if abs(pivot) < 1e-12: + continue + for j in range(i, cols + 1): + aug[i][j] /= pivot + for r in range(cols): + if r != i: + factor = aug[r][i] + for j in range(i, cols + 1): + aug[r][j] -= factor * aug[i][j] + + params = [aug[i][cols] for i in range(cols)] + bias = params[0] + weights = params[1:] + + # Training RMSE + predictions = [bias + sum(w * x for w, x in zip(weights, row)) for row in X] + mse = sum((p - a) ** 2 for p, a in zip(predictions, y)) / n + rmse = mse ** 0.5 + + model_data = { + "feature_names": feature_names, + "weights": [round(w, 6) for w in weights], + "bias": round(bias, 6), + "training_rmse": round(rmse, 6), + "training_rows": n, + } + + with open(model, "w") as f: + json.dump(model_data, f, indent=2) + + print(f"Trained on {n} rows, {p} features") + print(f"Weights: {dict(zip(feature_names, weights))}") + print(f"Bias: {bias:.4f}, Training RMSE: {rmse:.4f}") + + import argparse + _parser = argparse.ArgumentParser(prog='Train regression', description='Train a simple linear regression model using ordinary least squares.\n\nFits weights and bias to minimise squared error. Uses only Python stdlib\n(no numpy/sklearn). The trained model is saved as a JSON file containing\nthe weight vector, bias, feature names, and training metrics.') + _parser.add_argument("--training-data", dest="training_data", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--target-column", dest="target_column", type=str, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--model", dest="model", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = train_regression(**_parsed_args) + args: + - '--training-data' + - inputPath: training_data + - if: + cond: + isPresent: target_column + then: + - '--target-column' + - inputValue: target_column + - '--model' + - outputPath: model + arguments: + target_column: + graphInput: + inputName: Target Column + training_data: + taskOutput: + outputName: train_data + taskId: Split + annotations: + editor.position: '{"x":1090,"y":480.25}' + Predict: + componentRef: + name: Predict + digest: 4841c31fc75f2d26a5a7d3123d6fa6fc6b43d8badd549ad3ac3e20119860938d + spec: + name: Predict + description: |- + Apply a trained linear regression model to produce predictions. + + Reads the model JSON (weights + bias) and the test CSV, computes + predicted values, and writes a CSV with columns: actual, predicted. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: predict.component.yaml + python_original_code: | + from cloud_pipelines import components + import csv + import json + + + def predict( + test_data: components.InputPath("CSV"), + model: components.InputPath("JSON"), + predictions: components.OutputPath("CSV"), + ): + """Apply a trained linear regression model to produce predictions. + + Reads the model JSON (weights + bias) and the test CSV, computes + predicted values, and writes a CSV with columns: actual, predicted. + + Args: + test_data: Input CSV with the same feature columns used in training. + model: Trained model JSON (from train_regression). + predictions: Output CSV with actual and predicted values. + """ + with open(model, "r") as f: + model_data = json.load(f) + + feature_names = model_data["feature_names"] + weights = model_data["weights"] + bias = model_data["bias"] + + with open(test_data, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + target_col = [c for c in rows[0].keys() if c not in feature_names][0] + + with open(predictions, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["actual", "predicted"]) + for row in rows: + features = [float(row[feat]) for feat in feature_names] + pred = bias + sum(w * x for w, x in zip(weights, features)) + actual = float(row[target_col]) + writer.writerow([round(actual, 4), round(pred, 4)]) + + print(f"Generated {len(rows)} predictions") + python_original_code_path: predict.py + components new regenerate python-function-component: 'true' + inputs: + - name: test_data + type: CSV + description: Input CSV with the same feature columns used in training. + - name: model + type: JSON + description: Trained model JSON (from train_regression). + outputs: + - name: predictions + type: CSV + description: Output CSV with actual and predicted values. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import csv + import json + + def predict( + test_data, + model, + predictions, + ): + """Apply a trained linear regression model to produce predictions. + + Reads the model JSON (weights + bias) and the test CSV, computes + predicted values, and writes a CSV with columns: actual, predicted. + + Args: + test_data: Input CSV with the same feature columns used in training. + model: Trained model JSON (from train_regression). + predictions: Output CSV with actual and predicted values. + """ + with open(model, "r") as f: + model_data = json.load(f) + + feature_names = model_data["feature_names"] + weights = model_data["weights"] + bias = model_data["bias"] + + with open(test_data, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + target_col = [c for c in rows[0].keys() if c not in feature_names][0] + + with open(predictions, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["actual", "predicted"]) + for row in rows: + features = [float(row[feat]) for feat in feature_names] + pred = bias + sum(w * x for w, x in zip(weights, features)) + actual = float(row[target_col]) + writer.writerow([round(actual, 4), round(pred, 4)]) + + print(f"Generated {len(rows)} predictions") + + import argparse + _parser = argparse.ArgumentParser(prog='Predict', description='Apply a trained linear regression model to produce predictions.\n\nReads the model JSON (weights + bias) and the test CSV, computes\npredicted values, and writes a CSV with columns: actual, predicted.') + _parser.add_argument("--test-data", dest="test_data", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--model", dest="model", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--predictions", dest="predictions", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = predict(**_parsed_args) + args: + - '--test-data' + - inputPath: test_data + - '--model' + - inputPath: model + - '--predictions' + - outputPath: predictions + arguments: + model: + taskOutput: + outputName: model + taskId: Train + test_data: + taskOutput: + outputName: test_data + taskId: Split + annotations: + editor.position: '{"x":1470,"y":269.75}' + Evaluate: + componentRef: + name: Evaluate + digest: c26e9e058d298c1c57dd96e15ea4261b99439a53fa9b323db4e9ef783933954c + spec: + name: Evaluate + description: |- + Compute regression metrics from a predictions CSV. + + Expects columns: actual, predicted. Outputs a JSON file with + MAE, MSE, RMSE, R-squared, and row count. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: evaluate.component.yaml + python_original_code: | + from cloud_pipelines import components + import csv + import json + import math + + + def evaluate( + predictions: components.InputPath("CSV"), + metrics: components.OutputPath("JSON"), + ): + """Compute regression metrics from a predictions CSV. + + Expects columns: actual, predicted. Outputs a JSON file with + MAE, MSE, RMSE, R-squared, and row count. + + Args: + predictions: Input CSV with actual and predicted columns. + metrics: Output JSON with computed regression metrics. + """ + with open(predictions, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + actuals = [float(r["actual"]) for r in rows] + preds = [float(r["predicted"]) for r in rows] + n = len(actuals) + + errors = [a - p for a, p in zip(actuals, preds)] + abs_errors = [abs(e) for e in errors] + sq_errors = [e ** 2 for e in errors] + + mae = sum(abs_errors) / n + mse = sum(sq_errors) / n + rmse = math.sqrt(mse) + + mean_actual = sum(actuals) / n + ss_tot = sum((a - mean_actual) ** 2 for a in actuals) + ss_res = sum(sq_errors) + r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0.0 + + result = { + "num_samples": n, + "mae": round(mae, 6), + "mse": round(mse, 6), + "rmse": round(rmse, 6), + "r_squared": round(r_squared, 6), + "max_error": round(max(abs_errors), 6), + } + + with open(metrics, "w") as f: + json.dump(result, f, indent=2) + + print(f"Evaluated {n} predictions:") + for k, v in result.items(): + print(f" {k}: {v}") + python_original_code_path: evaluate.py + components new regenerate python-function-component: 'true' + inputs: + - name: predictions + type: CSV + description: Input CSV with actual and predicted columns. + outputs: + - name: metrics + type: JSON + description: Output JSON with computed regression metrics. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import csv + import json + import math + + def evaluate( + predictions, + metrics, + ): + """Compute regression metrics from a predictions CSV. + + Expects columns: actual, predicted. Outputs a JSON file with + MAE, MSE, RMSE, R-squared, and row count. + + Args: + predictions: Input CSV with actual and predicted columns. + metrics: Output JSON with computed regression metrics. + """ + with open(predictions, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + actuals = [float(r["actual"]) for r in rows] + preds = [float(r["predicted"]) for r in rows] + n = len(actuals) + + errors = [a - p for a, p in zip(actuals, preds)] + abs_errors = [abs(e) for e in errors] + sq_errors = [e ** 2 for e in errors] + + mae = sum(abs_errors) / n + mse = sum(sq_errors) / n + rmse = math.sqrt(mse) + + mean_actual = sum(actuals) / n + ss_tot = sum((a - mean_actual) ** 2 for a in actuals) + ss_res = sum(sq_errors) + r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0.0 + + result = { + "num_samples": n, + "mae": round(mae, 6), + "mse": round(mse, 6), + "rmse": round(rmse, 6), + "r_squared": round(r_squared, 6), + "max_error": round(max(abs_errors), 6), + } + + with open(metrics, "w") as f: + json.dump(result, f, indent=2) + + print(f"Evaluated {n} predictions:") + for k, v in result.items(): + print(f" {k}: {v}") + + import argparse + _parser = argparse.ArgumentParser(prog='Evaluate', description='Compute regression metrics from a predictions CSV.\n\nExpects columns: actual, predicted. Outputs a JSON file with\nMAE, MSE, RMSE, R-squared, and row count.') + _parser.add_argument("--predictions", dest="predictions", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--metrics", dest="metrics", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = evaluate(**_parsed_args) + args: + - '--predictions' + - inputPath: predictions + - '--metrics' + - outputPath: metrics + arguments: + predictions: + taskOutput: + outputName: predictions + taskId: Predict + annotations: + editor.position: '{"x":1850,"y":183.5}' + Generate Data: + componentRef: + name: Generate dataset + digest: 7f837011088acc8e081f5f2ae5c981cc3bb73ed28bf4b2aea3134bc5297e1674 + spec: + name: Generate dataset + description: |- + Generate a synthetic regression dataset with 4 features and a target. + + Creates a CSV with columns: feature_1, feature_2, feature_3, feature_4, target. + The target is a noisy linear combination of the features, suitable for + demonstrating regression workflows. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: generate_dataset.component.yaml + python_original_code: | + from cloud_pipelines import components + import random + import csv + import math + + + def generate_dataset( + output_data: components.OutputPath("CSV"), + num_rows: int = 500, + random_seed: int = 42, + ): + """Generate a synthetic regression dataset with 4 features and a target. + + Creates a CSV with columns: feature_1, feature_2, feature_3, feature_4, target. + The target is a noisy linear combination of the features, suitable for + demonstrating regression workflows. + + Args: + output_data: Output CSV file path. + num_rows: Number of rows to generate. + random_seed: Seed for reproducibility. + """ + random.seed(random_seed) + + weights = [1.5, -2.0, 0.8, 3.2] + bias = 5.0 + noise_scale = 0.5 + + with open(output_data, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["feature_1", "feature_2", "feature_3", "feature_4", "target"]) + + for _ in range(num_rows): + features = [random.gauss(0, 1) for _ in range(4)] + target = bias + sum(w * x for w, x in zip(weights, features)) + target += random.gauss(0, noise_scale) + writer.writerow([round(x, 4) for x in features] + [round(target, 4)]) + python_original_code_path: generate_dataset.py + components new regenerate python-function-component: 'true' + inputs: + - name: num_rows + type: Integer + description: Number of rows to generate. + default: '500' + optional: true + - name: random_seed + type: Integer + description: Seed for reproducibility. + default: '42' + optional: true + outputs: + - name: output_data + type: CSV + description: Output CSV file path. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import random + import csv + import math + + def generate_dataset( + output_data, + num_rows = 500, + random_seed = 42, + ): + """Generate a synthetic regression dataset with 4 features and a target. + + Creates a CSV with columns: feature_1, feature_2, feature_3, feature_4, target. + The target is a noisy linear combination of the features, suitable for + demonstrating regression workflows. + + Args: + output_data: Output CSV file path. + num_rows: Number of rows to generate. + random_seed: Seed for reproducibility. + """ + random.seed(random_seed) + + weights = [1.5, -2.0, 0.8, 3.2] + bias = 5.0 + noise_scale = 0.5 + + with open(output_data, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["feature_1", "feature_2", "feature_3", "feature_4", "target"]) + + for _ in range(num_rows): + features = [random.gauss(0, 1) for _ in range(4)] + target = bias + sum(w * x for w, x in zip(weights, features)) + target += random.gauss(0, noise_scale) + writer.writerow([round(x, 4) for x in features] + [round(target, 4)]) + + import argparse + _parser = argparse.ArgumentParser(prog='Generate dataset', description='Generate a synthetic regression dataset with 4 features and a target.\n\nCreates a CSV with columns: feature_1, feature_2, feature_3, feature_4, target.\nThe target is a noisy linear combination of the features, suitable for\ndemonstrating regression workflows.') + _parser.add_argument("--num-rows", dest="num_rows", type=int, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--random-seed", dest="random_seed", type=int, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--output-data", dest="output_data", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = generate_dataset(**_parsed_args) + args: + - if: + cond: + isPresent: num_rows + then: + - '--num-rows' + - inputValue: num_rows + - if: + cond: + isPresent: random_seed + then: + - '--random-seed' + - inputValue: random_seed + - '--output-data' + - outputPath: output_data + arguments: + num_rows: + graphInput: + inputName: Num Rows + random_seed: + graphInput: + inputName: Random Seed + annotations: + editor.position: '{"x":330,"y":289.5}' + outputValues: + Metrics: + taskOutput: + outputName: metrics + taskId: Evaluate + Predictions: + taskOutput: + outputName: predictions + taskId: Predict + Trained Model: + taskOutput: + outputName: model + taskId: Train diff --git a/public/example-pipelines/Intro-Multinode.pipeline.component.png b/public/example-pipelines/Intro-Multinode.pipeline.component.png new file mode 100644 index 000000000..9cbab2fa8 Binary files /dev/null and b/public/example-pipelines/Intro-Multinode.pipeline.component.png differ diff --git a/public/example-pipelines/Intro-Multinode.pipeline.component.yaml b/public/example-pipelines/Intro-Multinode.pipeline.component.yaml new file mode 100644 index 000000000..cce923f84 --- /dev/null +++ b/public/example-pipelines/Intro-Multinode.pipeline.component.yaml @@ -0,0 +1,169 @@ +name: 'Intro: Multinode' +description: | + Demonstrates Tangle's multi-node feature: a single task expanded into N parallel + pods that share a Kubernetes Job and discover each other via launcher-injected + rendezvous metadata. Each pod prints its rank, then non-zero ranks open a TCP + connection to rank 0 to confirm the address resolves. Uses only the public + python:3.12-slim image and the Python standard library, so it works on both + Shopify's Oasis deployment and any open-source TangleML / cloud-pipelines.net + deployment. + + Defaults to 2 pods. To change the pod count, edit the literal value of the + `tangleml.com/launchers/kubernetes/multi_node/number_of_nodes` annotation on + the `Multinode Greeter` task below — it must be a string literal because + Kubernetes annotations don't support runtime references. +metadata: + annotations: + flex-nodes: '[{"id":"note-multinode","properties":{"title":"Multi-node Feature","content":"One task, N pods, one Kubernetes Job. The launcher injects WORLD_SIZE, RANK, and the rank-0 DNS address into each pod via dynamicData. Rank 0''s output is what Tangle records as the task output — other ranks'' outputs are dropped. Scale the pod count by editing the multi_node annotation on the task (literal string only — annotations cannot reference pipeline inputs).","color":"#E3F2FD"},"metadata":{"createdAt":"2026-05-22T00:00:00.000Z","createdBy":"river-tangent"},"size":{"width":340,"height":180},"position":{"x":40,"y":-220},"zIndex":0},{"id":"note-multinode-prereqs","properties":{"title":"How it works","content":"Required: the multi_node annotation on the task and the three dynamicData inputs (number_of_nodes, node_index, node_0_address). Optional but typical for real distributed training: IPC_LOCK capability, shared_memory, and a dynamic_volume PVC so all pods share a filesystem. This demo skips those — it only needs TCP connectivity to prove the rendezvous works.","color":"#FFF3E0"},"metadata":{"createdAt":"2026-05-22T00:00:00.000Z","createdBy":"river-tangent"},"size":{"width":340,"height":200},"position":{"x":420,"y":-220},"zIndex":0}]' + editor.flow-direction: left-to-right + cloned_from_run_id: 019e50eb18ef20c6ce6b +inputs: [] +outputs: [] +implementation: + graph: + tasks: + Print Summary: + componentRef: + name: Print Summary + spec: + name: Print Summary + description: Echoes the rank-0 summary line so it appears in the run output. + inputs: + - name: summary + type: String + outputs: + - name: echoed + type: String + implementation: + container: + image: python:3.12-slim + command: + - sh + - '-ec' + - | + # $0 is the first positional arg (inputPath), $1 the second (outputPath) + cat "$0" + mkdir -p "$(dirname "$1")" + cp "$0" "$1" + - inputPath: summary + - outputPath: echoed + arguments: + summary: + taskOutput: + outputName: summary + taskId: Multinode Greeter + annotations: + editor.position: '{"x": 600, "y": 100}' + Multinode Greeter: + componentRef: + name: Multinode Greeter + spec: + name: Multinode Greeter + description: | + Runs once per pod in a multi-node task. Prints rank, world size, and + the master address. Non-zero ranks open a TCP connection to rank 0 to + prove the launcher-injected DNS address actually resolves. Rank 0 + listens for one connection per worker, then writes a summary. + metadata: + annotations: + cloud_pipelines.net: 'true' + inputs: + - name: world_size + type: String + description: Total number of pods (WORLD_SIZE). Injected by the launcher. + - name: node_rank + type: String + description: This pod's rank in [0, world_size). Injected by the launcher. + - name: master_addr + type: String + description: DNS address of rank 0. Injected by the launcher. + outputs: + - name: summary + type: String + description: Per-rank summary line. + implementation: + container: + image: python:3.12-slim + command: + - python3 + - '-u' + - '-c' + - | + import os, socket, sys, time + + world_size = int(sys.argv[1]) + rank = int(sys.argv[2]) + master_addr = sys.argv[3] + summary_path = sys.argv[4] + port = 29500 + hostname = socket.gethostname() + + print(f"[rank={rank}/{world_size}] hostname={hostname} master_addr={master_addr}", flush=True) + + if rank == 0: + # Rank 0: bind and accept one connection per worker. + srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + srv.bind(("0.0.0.0", port)) + srv.listen(max(world_size, 1)) + srv.settimeout(120.0) + peers = [] + print(f"[rank=0] listening on :{port} for {world_size - 1} peers", flush=True) + for _ in range(world_size - 1): + conn, addr = srv.accept() + peer = conn.recv(64).decode("utf-8").strip() + peers.append(peer) + conn.sendall(b"ack\n") + conn.close() + print(f"[rank=0] greeted peer {peer} from {addr}", flush=True) + srv.close() + summary = f"rank=0 world_size={world_size} peers={peers}" + else: + # Other ranks: dial rank 0 and announce ourselves. + deadline = time.time() + 120.0 + last_err = None + while time.time() < deadline: + try: + s = socket.create_connection((master_addr, port), timeout=10) + s.sendall(f"rank={rank}\n".encode("utf-8")) + ack = s.recv(64).decode("utf-8").strip() + s.close() + print(f"[rank={rank}] handshake ok ack={ack!r}", flush=True) + break + except Exception as e: + last_err = e + time.sleep(2.0) + else: + raise SystemExit(f"[rank={rank}] failed to reach {master_addr}:{port}: {last_err}") + summary = f"rank={rank} world_size={world_size} master_addr={master_addr}" + + print(summary, flush=True) + + # Only rank 0 materializes the task output. The shared mount across + # pods is racy if multiple ranks write the same file, and Tangle + # treats rank 0's output as the canonical one. Other ranks exit cleanly. + if rank == 0: + os.makedirs(os.path.dirname(summary_path), exist_ok=True) + with open(summary_path, "w") as f: + f.write(summary + "\n") + args: + - inputValue: world_size + - inputValue: node_rank + - inputValue: master_addr + - outputPath: summary + arguments: + node_rank: + dynamicData: + system/multi_node/node_index: {} + world_size: + dynamicData: + system/multi_node/number_of_nodes: {} + master_addr: + dynamicData: + system/multi_node/node_0_address: {} + annotations: + editor.position: '{"x": 200, "y": 100}' + cloud-pipelines.net/launchers/generic/resources.cpu: '1' + cloud-pipelines.net/launchers/generic/resources.memory: 1Gi + tangleml.com/launchers/kubernetes/multi_node/number_of_nodes: '2' + outputValues: {} diff --git a/public/example-pipelines/Intro-Secrets.pipeline.component.png b/public/example-pipelines/Intro-Secrets.pipeline.component.png new file mode 100644 index 000000000..1ed8ecbbb Binary files /dev/null and b/public/example-pipelines/Intro-Secrets.pipeline.component.png differ diff --git a/public/example-pipelines/Intro-Secrets.pipeline.component.yaml b/public/example-pipelines/Intro-Secrets.pipeline.component.yaml new file mode 100644 index 000000000..716303e6b --- /dev/null +++ b/public/example-pipelines/Intro-Secrets.pipeline.component.yaml @@ -0,0 +1,148 @@ +name: 'Intro: Secrets' +description: | + Demonstrates Tangle's secret-injection feature: a pipeline input is bound to a + named secret at submit time via `dynamicData.secret`, and the value lands in + the running container as a file. The demo reads the secret, makes an + authenticated HTTP GET to https://httpbin.org/bearer using the secret as a + Bearer token, and prints httpbin's response. Uses only the public + python:3.12-slim image and the Python standard library — no Shopify-internal + components or data. + + Before submitting, create the secret on the cluster: + + tangle-deploy secrets create DEMO_BEARER_TOKEN --value 'any-string-works' + + Then submit with a config that maps the pipeline input to the secret name: + + tangle-deploy pipeline-run submit secrets_demo_pipeline.yaml \\ + -f secrets_demo_pipeline.config.yaml --hydrate --no-wait + + The config (see secrets_demo_pipeline.config.yaml) just contains: + + pipeline_path: secrets_demo_pipeline.yaml + args: + DEMO_TOKEN: + dynamicData: + secret: + name: DEMO_BEARER_TOKEN + + Clean up after: `tangle-deploy secrets delete DEMO_BEARER_TOKEN`. +metadata: + annotations: + flex-nodes: '[{"id":"note-secrets","properties":{"title":"Secrets Feature","content":"Pipeline inputs can be bound to named secrets at submit time. The secret value is materialized inside the container as a file at the path passed via inputPath — never as an environment variable, never on the command line. This keeps the value out of pod specs, run YAML, and Tangle UI metadata.","color":"#E3F2FD"},"metadata":{"createdAt":"2026-05-22T00:00:00.000Z","createdBy":"river-tangent"},"size":{"width":340,"height":180},"position":{"x":40,"y":-220},"zIndex":0},{"id":"note-secrets-flow","properties":{"title":"Two-step setup","content":"1) Create the secret in your Tangle instance via the Settings -> Secrets menu.\n2) Submit the pipeline with a config that maps the pipeline input to the secret name via dynamicData.secret. The value never appears in the pipeline YAML itself — only the name does, and only at submit time.","color":"#FFF3E0"},"metadata":{"createdAt":"2026-05-22T00:00:00.000Z","createdBy":"river-tangent"},"size":{"width":340,"height":200},"position":{"x":420,"y":-220},"zIndex":0,"readOnly":false,"highlighted":false}]' + editor.flow-direction: left-to-right + cloned_from_run_id: 019e50eb1c5468741cad +inputs: + - name: DEMO_TOKEN + description: | + Bearer token, supplied at submit time via dynamicData.secret. Read by the + `Authenticated Request` task as a file (inputPath) — never logged. + annotations: + editor.position: '{"x": -150, "y": 100}' +outputs: + - name: HTTP Response + annotations: + editor.position: '{"x":980,"y":100}' +implementation: + graph: + tasks: + Show Response: + componentRef: + name: Show Response + spec: + name: Show Response + description: Echoes the masked HTTP response to stdout and to the pipeline output. + inputs: + - name: response + type: String + outputs: + - name: echoed + type: String + implementation: + container: + image: python:3.12-slim + command: + - sh + - '-ec' + - | + # $0 is the first positional arg (inputPath), $1 the second (outputPath) + cat "$0" + mkdir -p "$(dirname "$1")" + cp "$0" "$1" + - inputPath: response + - outputPath: echoed + arguments: + response: + taskOutput: + outputName: response + taskId: Authenticated Request + annotations: + editor.position: '{"x": 600, "y": 100}' + Authenticated Request: + componentRef: + name: Authenticated Request + spec: + name: Authenticated Request + description: | + Reads a bearer token from the secret file, sends a GET to + https://httpbin.org/bearer with `Authorization: Bearer `, and + writes the JSON response body to its output. Prints the token length + (never the value) and the HTTP status to stdout. + inputs: + - name: token + description: Bearer token, injected from a Tangle secret. + outputs: + - name: response + type: String + description: httpbin's JSON response body. + implementation: + container: + image: python:3.12-slim + command: + - python3 + - '-u' + - '-c' + - | + import json, os, sys, urllib.request + + token_path, response_path = sys.argv[1], sys.argv[2] + with open(token_path) as f: + token = f.read().strip() + + print(f"loaded token: length={len(token)} (value not logged)") + + req = urllib.request.Request( + "https://httpbin.org/bearer", + headers={"Authorization": f"Bearer {token}"}, + ) + with urllib.request.urlopen(req, timeout=30) as resp: + status = resp.status + body = resp.read().decode("utf-8") + + print(f"http status: {status}") + parsed = json.loads(body) + # httpbin echoes the token back — mask it before writing to the output. + if "token" in parsed: + parsed["token"] = f"<{len(parsed['token'])} chars>" + masked = json.dumps(parsed, indent=2) + print(masked) + + os.makedirs(os.path.dirname(response_path), exist_ok=True) + with open(response_path, "w") as f: + f.write(masked + "\n") + args: + - inputPath: token + - outputPath: response + arguments: + token: + graphInput: + inputName: DEMO_TOKEN + annotations: + editor.position: '{"x": 200, "y": 100}' + cloud-pipelines.net/launchers/generic/resources.cpu: '1' + cloud-pipelines.net/launchers/generic/resources.memory: 512Mi + outputValues: + HTTP Response: + taskOutput: + outputName: echoed + taskId: Show Response diff --git a/public/example-pipelines/Intro-Subgraphs.pipeline.component.png b/public/example-pipelines/Intro-Subgraphs.pipeline.component.png new file mode 100644 index 000000000..4c5fd5175 Binary files /dev/null and b/public/example-pipelines/Intro-Subgraphs.pipeline.component.png differ diff --git a/public/example-pipelines/Intro-Subgraphs.pipeline.component.yaml b/public/example-pipelines/Intro-Subgraphs.pipeline.component.yaml new file mode 100644 index 000000000..b196c1b75 --- /dev/null +++ b/public/example-pipelines/Intro-Subgraphs.pipeline.component.yaml @@ -0,0 +1,1036 @@ +name: 'Intro: Subgraphs' +description: | + Organising work with subgraphs (nested pipelines). Three top-level tasks are themselves subgraphs — each with its own internal task graph, inputs, and outputs. Data Preparation generates and splits the data; Training fits a linear model; Evaluation predicts and scores. Double-click a subgraph in the UI to see inside it. +metadata: + annotations: + flex-nodes: '[{"id":"note-sg","properties":{"title":"Subgraphs","content":"Each node is a subgraph — a task whose componentRef.spec contains implementation.graph with its own tasks. Subgraphs encapsulate complexity: the top level stays clean while each subgraph handles its own internal wiring.\n\nDouble-click a subgraph to look inside.","color":"#F3E5F5"},"metadata":{"createdAt":"2026-05-19T00:00:00.000Z","createdBy":"tangle-examples"},"size":{"width":320,"height":140},"position":{"x":150,"y":-140},"zIndex":0,"readOnly":false,"highlighted":false}]' + editor.flow-direction: left-to-right +implementation: + graph: + tasks: + Training: + componentRef: + name: Training + digest: bb9f710b9685cc5e839510b94edb30cf5f5ba5b864743afa0292bcfbb821b792 + spec: + name: Training + description: Train a linear regression model + inputs: + - name: training_data + type: CSV + annotations: + editor.position: '{"x":0,"y":0}' + - name: target_column + type: String + default: target + annotations: + editor.position: '{"x":47,"y":176}' + outputs: + - name: model + type: JSON + annotations: + editor.position: '{"x":681,"y":76}' + implementation: + graph: + tasks: + Fit Model: + componentRef: + name: Train regression + digest: e4292a5974ba0c989f95fff77d993e75eb9c6b26ebe23d8df775f804d22309f0 + spec: + name: Train regression + description: |- + Train a simple linear regression model using ordinary least squares. + + Fits weights and bias to minimise squared error. Uses only Python stdlib + (no numpy/sklearn). The trained model is saved as a JSON file containing + the weight vector, bias, feature names, and training metrics. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: train_regression.component.yaml + python_original_code: | + from cloud_pipelines import components + import csv + import json + + + def train_regression( + training_data: components.InputPath("CSV"), + model: components.OutputPath("JSON"), + target_column: str = "target", + ): + """Train a simple linear regression model using ordinary least squares. + + Fits weights and bias to minimise squared error. Uses only Python stdlib + (no numpy/sklearn). The trained model is saved as a JSON file containing + the weight vector, bias, feature names, and training metrics. + + Args: + training_data: Input CSV with feature columns and a target column. + model: Output JSON file with trained model parameters. + target_column: Name of the column to predict. + """ + with open(training_data, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + feature_names = [k for k in rows[0].keys() if k != target_column] + n = len(rows) + p = len(feature_names) + + X = [[float(row[f]) for f in feature_names] for row in rows] + y = [float(row[target_column]) for row in rows] + + # Add bias column (column of 1s) + X_aug = [[1.0] + row for row in X] + cols = p + 1 + + # Normal equation: (X^T X)^-1 X^T y + # Compute X^T X + XtX = [[0.0] * cols for _ in range(cols)] + for i in range(cols): + for j in range(cols): + XtX[i][j] = sum(X_aug[k][i] * X_aug[k][j] for k in range(n)) + + # Compute X^T y + Xty = [sum(X_aug[k][i] * y[k] for k in range(n)) for i in range(cols)] + + # Solve via Gaussian elimination + aug = [XtX[i][:] + [Xty[i]] for i in range(cols)] + for i in range(cols): + max_row = max(range(i, cols), key=lambda r: abs(aug[r][i])) + aug[i], aug[max_row] = aug[max_row], aug[i] + pivot = aug[i][i] + if abs(pivot) < 1e-12: + continue + for j in range(i, cols + 1): + aug[i][j] /= pivot + for r in range(cols): + if r != i: + factor = aug[r][i] + for j in range(i, cols + 1): + aug[r][j] -= factor * aug[i][j] + + params = [aug[i][cols] for i in range(cols)] + bias = params[0] + weights = params[1:] + + # Training RMSE + predictions = [bias + sum(w * x for w, x in zip(weights, row)) for row in X] + mse = sum((p - a) ** 2 for p, a in zip(predictions, y)) / n + rmse = mse ** 0.5 + + model_data = { + "feature_names": feature_names, + "weights": [round(w, 6) for w in weights], + "bias": round(bias, 6), + "training_rmse": round(rmse, 6), + "training_rows": n, + } + + with open(model, "w") as f: + json.dump(model_data, f, indent=2) + + print(f"Trained on {n} rows, {p} features") + print(f"Weights: {dict(zip(feature_names, weights))}") + print(f"Bias: {bias:.4f}, Training RMSE: {rmse:.4f}") + python_original_code_path: train_regression.py + components new regenerate python-function-component: 'true' + inputs: + - name: training_data + type: CSV + description: Input CSV with feature columns and a target column. + - name: target_column + type: String + description: Name of the column to predict. + default: target + optional: true + outputs: + - name: model + type: JSON + description: Output JSON file with trained model parameters. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import csv + import json + + def train_regression( + training_data, + model, + target_column = "target", + ): + """Train a simple linear regression model using ordinary least squares. + + Fits weights and bias to minimise squared error. Uses only Python stdlib + (no numpy/sklearn). The trained model is saved as a JSON file containing + the weight vector, bias, feature names, and training metrics. + + Args: + training_data: Input CSV with feature columns and a target column. + model: Output JSON file with trained model parameters. + target_column: Name of the column to predict. + """ + with open(training_data, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + feature_names = [k for k in rows[0].keys() if k != target_column] + n = len(rows) + p = len(feature_names) + + X = [[float(row[f]) for f in feature_names] for row in rows] + y = [float(row[target_column]) for row in rows] + + # Add bias column (column of 1s) + X_aug = [[1.0] + row for row in X] + cols = p + 1 + + # Normal equation: (X^T X)^-1 X^T y + # Compute X^T X + XtX = [[0.0] * cols for _ in range(cols)] + for i in range(cols): + for j in range(cols): + XtX[i][j] = sum(X_aug[k][i] * X_aug[k][j] for k in range(n)) + + # Compute X^T y + Xty = [sum(X_aug[k][i] * y[k] for k in range(n)) for i in range(cols)] + + # Solve via Gaussian elimination + aug = [XtX[i][:] + [Xty[i]] for i in range(cols)] + for i in range(cols): + max_row = max(range(i, cols), key=lambda r: abs(aug[r][i])) + aug[i], aug[max_row] = aug[max_row], aug[i] + pivot = aug[i][i] + if abs(pivot) < 1e-12: + continue + for j in range(i, cols + 1): + aug[i][j] /= pivot + for r in range(cols): + if r != i: + factor = aug[r][i] + for j in range(i, cols + 1): + aug[r][j] -= factor * aug[i][j] + + params = [aug[i][cols] for i in range(cols)] + bias = params[0] + weights = params[1:] + + # Training RMSE + predictions = [bias + sum(w * x for w, x in zip(weights, row)) for row in X] + mse = sum((p - a) ** 2 for p, a in zip(predictions, y)) / n + rmse = mse ** 0.5 + + model_data = { + "feature_names": feature_names, + "weights": [round(w, 6) for w in weights], + "bias": round(bias, 6), + "training_rmse": round(rmse, 6), + "training_rows": n, + } + + with open(model, "w") as f: + json.dump(model_data, f, indent=2) + + print(f"Trained on {n} rows, {p} features") + print(f"Weights: {dict(zip(feature_names, weights))}") + print(f"Bias: {bias:.4f}, Training RMSE: {rmse:.4f}") + + import argparse + _parser = argparse.ArgumentParser(prog='Train regression', description='Train a simple linear regression model using ordinary least squares.\n\nFits weights and bias to minimise squared error. Uses only Python stdlib\n(no numpy/sklearn). The trained model is saved as a JSON file containing\nthe weight vector, bias, feature names, and training metrics.') + _parser.add_argument("--training-data", dest="training_data", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--target-column", dest="target_column", type=str, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--model", dest="model", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = train_regression(**_parsed_args) + args: + - '--training-data' + - inputPath: training_data + - if: + cond: + isPresent: target_column + then: + - '--target-column' + - inputValue: target_column + - '--model' + - outputPath: model + arguments: + target_column: + graphInput: + inputName: target_column + training_data: + graphInput: + inputName: training_data + annotations: + editor.position: '{"x":301,"y":55.5}' + outputValues: + model: + taskOutput: + outputName: model + taskId: Fit Model + arguments: + training_data: + taskOutput: + outputName: train_data + taskId: Data Preparation + annotations: + editor.position: '{"x": 500, "y": 0}' + Evaluation: + componentRef: + name: Evaluation + digest: 3e29013959318f296132514881cba7efaf948bc0618098527f6c086c3f24c66f + spec: + name: Evaluation + description: Predict on test data and compute regression metrics + inputs: + - name: test_data + type: CSV + annotations: + editor.position: '{"x":0,"y":25.25}' + - name: model + type: JSON + annotations: + editor.position: '{"x":0,"y":201.25}' + outputs: + - name: predictions + type: CSV + annotations: + editor.position: '{"x":749.5,"y":0}' + - name: metrics + type: JSON + annotations: + editor.position: '{"x":1061,"y":202.5}' + implementation: + graph: + tasks: + Predict: + componentRef: + name: Predict + digest: 4841c31fc75f2d26a5a7d3123d6fa6fc6b43d8badd549ad3ac3e20119860938d + spec: + name: Predict + description: |- + Apply a trained linear regression model to produce predictions. + + Reads the model JSON (weights + bias) and the test CSV, computes + predicted values, and writes a CSV with columns: actual, predicted. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: predict.component.yaml + python_original_code: | + from cloud_pipelines import components + import csv + import json + + + def predict( + test_data: components.InputPath("CSV"), + model: components.InputPath("JSON"), + predictions: components.OutputPath("CSV"), + ): + """Apply a trained linear regression model to produce predictions. + + Reads the model JSON (weights + bias) and the test CSV, computes + predicted values, and writes a CSV with columns: actual, predicted. + + Args: + test_data: Input CSV with the same feature columns used in training. + model: Trained model JSON (from train_regression). + predictions: Output CSV with actual and predicted values. + """ + with open(model, "r") as f: + model_data = json.load(f) + + feature_names = model_data["feature_names"] + weights = model_data["weights"] + bias = model_data["bias"] + + with open(test_data, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + target_col = [c for c in rows[0].keys() if c not in feature_names][0] + + with open(predictions, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["actual", "predicted"]) + for row in rows: + features = [float(row[feat]) for feat in feature_names] + pred = bias + sum(w * x for w, x in zip(weights, features)) + actual = float(row[target_col]) + writer.writerow([round(actual, 4), round(pred, 4)]) + + print(f"Generated {len(rows)} predictions") + python_original_code_path: predict.py + components new regenerate python-function-component: 'true' + inputs: + - name: test_data + type: CSV + description: Input CSV with the same feature columns used in training. + - name: model + type: JSON + description: Trained model JSON (from train_regression). + outputs: + - name: predictions + type: CSV + description: Output CSV with actual and predicted values. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import csv + import json + + def predict( + test_data, + model, + predictions, + ): + """Apply a trained linear regression model to produce predictions. + + Reads the model JSON (weights + bias) and the test CSV, computes + predicted values, and writes a CSV with columns: actual, predicted. + + Args: + test_data: Input CSV with the same feature columns used in training. + model: Trained model JSON (from train_regression). + predictions: Output CSV with actual and predicted values. + """ + with open(model, "r") as f: + model_data = json.load(f) + + feature_names = model_data["feature_names"] + weights = model_data["weights"] + bias = model_data["bias"] + + with open(test_data, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + target_col = [c for c in rows[0].keys() if c not in feature_names][0] + + with open(predictions, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["actual", "predicted"]) + for row in rows: + features = [float(row[feat]) for feat in feature_names] + pred = bias + sum(w * x for w, x in zip(weights, features)) + actual = float(row[target_col]) + writer.writerow([round(actual, 4), round(pred, 4)]) + + print(f"Generated {len(rows)} predictions") + + import argparse + _parser = argparse.ArgumentParser(prog='Predict', description='Apply a trained linear regression model to produce predictions.\n\nReads the model JSON (weights + bias) and the test CSV, computes\npredicted values, and writes a CSV with columns: actual, predicted.') + _parser.add_argument("--test-data", dest="test_data", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--model", dest="model", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--predictions", dest="predictions", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = predict(**_parsed_args) + args: + - '--test-data' + - inputPath: test_data + - '--model' + - inputPath: model + - '--predictions' + - outputPath: predictions + arguments: + model: + graphInput: + inputName: model + test_data: + graphInput: + inputName: test_data + annotations: + editor.position: '{"x":301,"y":86.75}' + Compute Metrics: + componentRef: + name: Evaluate + digest: c26e9e058d298c1c57dd96e15ea4261b99439a53fa9b323db4e9ef783933954c + spec: + name: Evaluate + description: |- + Compute regression metrics from a predictions CSV. + + Expects columns: actual, predicted. Outputs a JSON file with + MAE, MSE, RMSE, R-squared, and row count. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: evaluate.component.yaml + python_original_code: | + from cloud_pipelines import components + import csv + import json + import math + + + def evaluate( + predictions: components.InputPath("CSV"), + metrics: components.OutputPath("JSON"), + ): + """Compute regression metrics from a predictions CSV. + + Expects columns: actual, predicted. Outputs a JSON file with + MAE, MSE, RMSE, R-squared, and row count. + + Args: + predictions: Input CSV with actual and predicted columns. + metrics: Output JSON with computed regression metrics. + """ + with open(predictions, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + actuals = [float(r["actual"]) for r in rows] + preds = [float(r["predicted"]) for r in rows] + n = len(actuals) + + errors = [a - p for a, p in zip(actuals, preds)] + abs_errors = [abs(e) for e in errors] + sq_errors = [e ** 2 for e in errors] + + mae = sum(abs_errors) / n + mse = sum(sq_errors) / n + rmse = math.sqrt(mse) + + mean_actual = sum(actuals) / n + ss_tot = sum((a - mean_actual) ** 2 for a in actuals) + ss_res = sum(sq_errors) + r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0.0 + + result = { + "num_samples": n, + "mae": round(mae, 6), + "mse": round(mse, 6), + "rmse": round(rmse, 6), + "r_squared": round(r_squared, 6), + "max_error": round(max(abs_errors), 6), + } + + with open(metrics, "w") as f: + json.dump(result, f, indent=2) + + print(f"Evaluated {n} predictions:") + for k, v in result.items(): + print(f" {k}: {v}") + python_original_code_path: evaluate.py + components new regenerate python-function-component: 'true' + inputs: + - name: predictions + type: CSV + description: Input CSV with actual and predicted columns. + outputs: + - name: metrics + type: JSON + description: Output JSON with computed regression metrics. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import csv + import json + import math + + def evaluate( + predictions, + metrics, + ): + """Compute regression metrics from a predictions CSV. + + Expects columns: actual, predicted. Outputs a JSON file with + MAE, MSE, RMSE, R-squared, and row count. + + Args: + predictions: Input CSV with actual and predicted columns. + metrics: Output JSON with computed regression metrics. + """ + with open(predictions, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + actuals = [float(r["actual"]) for r in rows] + preds = [float(r["predicted"]) for r in rows] + n = len(actuals) + + errors = [a - p for a, p in zip(actuals, preds)] + abs_errors = [abs(e) for e in errors] + sq_errors = [e ** 2 for e in errors] + + mae = sum(abs_errors) / n + mse = sum(sq_errors) / n + rmse = math.sqrt(mse) + + mean_actual = sum(actuals) / n + ss_tot = sum((a - mean_actual) ** 2 for a in actuals) + ss_res = sum(sq_errors) + r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0.0 + + result = { + "num_samples": n, + "mae": round(mae, 6), + "mse": round(mse, 6), + "rmse": round(rmse, 6), + "r_squared": round(r_squared, 6), + "max_error": round(max(abs_errors), 6), + } + + with open(metrics, "w") as f: + json.dump(result, f, indent=2) + + print(f"Evaluated {n} predictions:") + for k, v in result.items(): + print(f" {k}: {v}") + + import argparse + _parser = argparse.ArgumentParser(prog='Evaluate', description='Compute regression metrics from a predictions CSV.\n\nExpects columns: actual, predicted. Outputs a JSON file with\nMAE, MSE, RMSE, R-squared, and row count.') + _parser.add_argument("--predictions", dest="predictions", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--metrics", dest="metrics", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = evaluate(**_parsed_args) + args: + - '--predictions' + - inputPath: predictions + - '--metrics' + - outputPath: metrics + arguments: + predictions: + taskOutput: + outputName: predictions + taskId: Predict + annotations: + editor.position: '{"x":681,"y":200}' + outputValues: + metrics: + taskOutput: + outputName: metrics + taskId: Compute Metrics + predictions: + taskOutput: + outputName: predictions + taskId: Predict + arguments: + model: + taskOutput: + outputName: model + taskId: Training + test_data: + taskOutput: + outputName: test_data + taskId: Data Preparation + annotations: + editor.position: '{"x": 500, "y": 300}' + Data Preparation: + componentRef: + name: Data Preparation + digest: 653c0dddb09515926be064acab5ae1fc9ad0f25f4b9376882532740b91ca0658 + spec: + name: Data Preparation + description: Generate synthetic data and split into train/test + inputs: + - name: num_rows + type: Integer + default: '500' + annotations: + editor.position: '{"x":0,"y":32.5}' + - name: train_fraction + type: Float + default: '0.8' + annotations: + editor.position: '{"x":288,"y":241}' + outputs: + - name: train_data + type: CSV + annotations: + editor.position: '{"x":954,"y":24.75}' + - name: test_data + type: CSV + annotations: + editor.position: '{"x":958,"y":224.75}' + implementation: + graph: + tasks: + Split: + componentRef: + name: Split csv + digest: 7dbbe3ac41f4e820f0d168ef355ada703716f4593eb5e70664746eebe0fe79e7 + spec: + name: Split csv + description: |- + Split a CSV dataset into train and test sets. + + Randomly shuffles rows, then splits by the given fraction. + Both output files keep the same header row. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: split_csv.component.yaml + python_original_code: | + from cloud_pipelines import components + import csv + import random + + + def split_csv( + input_data: components.InputPath("CSV"), + train_data: components.OutputPath("CSV"), + test_data: components.OutputPath("CSV"), + train_fraction: float = 0.8, + random_seed: int = 42, + ): + """Split a CSV dataset into train and test sets. + + Randomly shuffles rows, then splits by the given fraction. + Both output files keep the same header row. + + Args: + input_data: Input CSV file. + train_data: Output CSV for the training split. + test_data: Output CSV for the test split. + train_fraction: Fraction of rows for training (0.0 to 1.0). + random_seed: Seed for reproducible shuffling. + """ + random.seed(random_seed) + + with open(input_data, "r") as f: + reader = csv.reader(f) + header = next(reader) + rows = list(reader) + + random.shuffle(rows) + split_idx = int(len(rows) * train_fraction) + train_rows = rows[:split_idx] + test_rows = rows[split_idx:] + + for path, subset in [(train_data, train_rows), (test_data, test_rows)]: + with open(path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(header) + writer.writerows(subset) + + print(f"Split {len(rows)} rows -> {len(train_rows)} train, {len(test_rows)} test") + python_original_code_path: split_csv.py + components new regenerate python-function-component: 'true' + inputs: + - name: input_data + type: CSV + description: Input CSV file. + - name: train_fraction + type: Float + description: Fraction of rows for training (0.0 to 1.0). + default: '0.8' + optional: true + - name: random_seed + type: Integer + description: Seed for reproducible shuffling. + default: '42' + optional: true + outputs: + - name: train_data + type: CSV + description: Output CSV for the training split. + - name: test_data + type: CSV + description: Output CSV for the test split. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import csv + import random + + def split_csv( + input_data, + train_data, + test_data, + train_fraction = 0.8, + random_seed = 42, + ): + """Split a CSV dataset into train and test sets. + + Randomly shuffles rows, then splits by the given fraction. + Both output files keep the same header row. + + Args: + input_data: Input CSV file. + train_data: Output CSV for the training split. + test_data: Output CSV for the test split. + train_fraction: Fraction of rows for training (0.0 to 1.0). + random_seed: Seed for reproducible shuffling. + """ + random.seed(random_seed) + + with open(input_data, "r") as f: + reader = csv.reader(f) + header = next(reader) + rows = list(reader) + + random.shuffle(rows) + split_idx = int(len(rows) * train_fraction) + train_rows = rows[:split_idx] + test_rows = rows[split_idx:] + + for path, subset in [(train_data, train_rows), (test_data, test_rows)]: + with open(path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(header) + writer.writerows(subset) + + print(f"Split {len(rows)} rows -> {len(train_rows)} train, {len(test_rows)} test") + + import argparse + _parser = argparse.ArgumentParser(prog='Split csv', description='Split a CSV dataset into train and test sets.\n\nRandomly shuffles rows, then splits by the given fraction.\nBoth output files keep the same header row.') + _parser.add_argument("--input-data", dest="input_data", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--train-fraction", dest="train_fraction", type=float, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--random-seed", dest="random_seed", type=int, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--train-data", dest="train_data", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--test-data", dest="test_data", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = split_csv(**_parsed_args) + args: + - '--input-data' + - inputPath: input_data + - if: + cond: + isPresent: train_fraction + then: + - '--train-fraction' + - inputValue: train_fraction + - if: + cond: + isPresent: random_seed + then: + - '--random-seed' + - inputValue: random_seed + - '--train-data' + - outputPath: train_data + - '--test-data' + - outputPath: test_data + arguments: + input_data: + taskOutput: + outputName: output_data + taskId: Generate + train_fraction: + graphInput: + inputName: train_fraction + annotations: + editor.position: '{"x":574,"y":68.25}' + Generate: + componentRef: + name: Generate dataset + digest: 7f837011088acc8e081f5f2ae5c981cc3bb73ed28bf4b2aea3134bc5297e1674 + spec: + name: Generate dataset + description: |- + Generate a synthetic regression dataset with 4 features and a target. + + Creates a CSV with columns: feature_1, feature_2, feature_3, feature_4, target. + The target is a noisy linear combination of the features, suitable for + demonstrating regression workflows. + metadata: + annotations: + cloud_pipelines.net: 'true' + component_yaml_path: generate_dataset.component.yaml + python_original_code: | + from cloud_pipelines import components + import random + import csv + import math + + + def generate_dataset( + output_data: components.OutputPath("CSV"), + num_rows: int = 500, + random_seed: int = 42, + ): + """Generate a synthetic regression dataset with 4 features and a target. + + Creates a CSV with columns: feature_1, feature_2, feature_3, feature_4, target. + The target is a noisy linear combination of the features, suitable for + demonstrating regression workflows. + + Args: + output_data: Output CSV file path. + num_rows: Number of rows to generate. + random_seed: Seed for reproducibility. + """ + random.seed(random_seed) + + weights = [1.5, -2.0, 0.8, 3.2] + bias = 5.0 + noise_scale = 0.5 + + with open(output_data, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["feature_1", "feature_2", "feature_3", "feature_4", "target"]) + + for _ in range(num_rows): + features = [random.gauss(0, 1) for _ in range(4)] + target = bias + sum(w * x for w, x in zip(weights, features)) + target += random.gauss(0, noise_scale) + writer.writerow([round(x, 4) for x in features] + [round(target, 4)]) + python_original_code_path: generate_dataset.py + components new regenerate python-function-component: 'true' + inputs: + - name: num_rows + type: Integer + description: Number of rows to generate. + default: '500' + optional: true + - name: random_seed + type: Integer + description: Seed for reproducibility. + default: '42' + optional: true + outputs: + - name: output_data + type: CSV + description: Output CSV file path. + implementation: + container: + image: python:3.12 + command: + - sh + - '-ec' + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + import random + import csv + import math + + def generate_dataset( + output_data, + num_rows = 500, + random_seed = 42, + ): + """Generate a synthetic regression dataset with 4 features and a target. + + Creates a CSV with columns: feature_1, feature_2, feature_3, feature_4, target. + The target is a noisy linear combination of the features, suitable for + demonstrating regression workflows. + + Args: + output_data: Output CSV file path. + num_rows: Number of rows to generate. + random_seed: Seed for reproducibility. + """ + random.seed(random_seed) + + weights = [1.5, -2.0, 0.8, 3.2] + bias = 5.0 + noise_scale = 0.5 + + with open(output_data, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["feature_1", "feature_2", "feature_3", "feature_4", "target"]) + + for _ in range(num_rows): + features = [random.gauss(0, 1) for _ in range(4)] + target = bias + sum(w * x for w, x in zip(weights, features)) + target += random.gauss(0, noise_scale) + writer.writerow([round(x, 4) for x in features] + [round(target, 4)]) + + import argparse + _parser = argparse.ArgumentParser(prog='Generate dataset', description='Generate a synthetic regression dataset with 4 features and a target.\n\nCreates a CSV with columns: feature_1, feature_2, feature_3, feature_4, target.\nThe target is a noisy linear combination of the features, suitable for\ndemonstrating regression workflows.') + _parser.add_argument("--num-rows", dest="num_rows", type=int, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--random-seed", dest="random_seed", type=int, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--output-data", dest="output_data", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = generate_dataset(**_parsed_args) + args: + - if: + cond: + isPresent: num_rows + then: + - '--num-rows' + - inputValue: num_rows + - if: + cond: + isPresent: random_seed + then: + - '--random-seed' + - inputValue: random_seed + - '--output-data' + - outputPath: output_data + arguments: + num_rows: + graphInput: + inputName: num_rows + annotations: + editor.position: '{"x":194,"y":0}' + outputValues: + test_data: + taskOutput: + outputName: test_data + taskId: Split + train_data: + taskOutput: + outputName: train_data + taskId: Split + annotations: + editor.position: '{"x": 0, "y": 100}' diff --git a/src/components/Home/PipelineSection/PipelineSection.tsx b/src/components/Home/PipelineSection/PipelineSection.tsx index 6f97480c9..96b2b5608 100644 --- a/src/components/Home/PipelineSection/PipelineSection.tsx +++ b/src/components/Home/PipelineSection/PipelineSection.tsx @@ -1,10 +1,10 @@ import { Link } from "@tanstack/react-router"; import { useEffect, useState } from "react"; +import { ExamplePipelines } from "@/components/Learn/ExamplePipelines"; import { LoadingScreen } from "@/components/shared/LoadingScreen"; import NewPipelineButton from "@/components/shared/NewPipelineButton"; import { PaginationControls } from "@/components/shared/PaginationControls"; -import QuickStartCards from "@/components/shared/QuickStart/QuickStartCards"; import { withSuspenseWrapper } from "@/components/shared/SuspenseWrapper"; import { Button } from "@/components/ui/button"; import { Checkbox } from "@/components/ui/checkbox"; @@ -21,7 +21,7 @@ import { } from "@/components/ui/table"; import { Paragraph, Text } from "@/components/ui/typography"; import { usePagination } from "@/hooks/usePagination"; -import { QUICK_START_PATH } from "@/routes/router"; +import { APP_ROUTES } from "@/routes/router"; import { type ComponentFileEntry, getAllComponentFilesFromList, @@ -125,7 +125,7 @@ export const PipelineSection = withSuspenseWrapper( You don't have any pipelines yet. Get started with a template below. - + Or start from scratch with @@ -143,7 +143,7 @@ export const PipelineSection = withSuspenseWrapper( } + actions={} /> @@ -220,10 +220,10 @@ export const PipelineSection = withSuspenseWrapper( PipelineSectionSkeleton, ); -function QuickStartButton() { +function ExamplePipelineButton() { return (