diff --git a/api/config/observability.go b/api/config/observability.go index 21d74d8ea..2252c9dfd 100644 --- a/api/config/observability.go +++ b/api/config/observability.go @@ -8,7 +8,6 @@ import ( // ObservabilityPublisher type ObservabilityPublisher struct { ArizeSink ArizeSink - BigQuerySink BigQuerySink MaxComputeSink MaxComputeSink KafkaConsumer KafkaConsumer ImageName string @@ -46,14 +45,6 @@ func (az ArizeSink) IsEnabled(modelSerial string) bool { return false } -// BigQuerySink -type BigQuerySink struct { - Project string - Dataset string - TTLDays int - Enabled bool -} - // MaxComputeSink type MaxComputeSink struct { Project string diff --git a/api/pkg/observability/deployment/config.go b/api/pkg/observability/deployment/config.go index 281b9b155..d6ad709ca 100644 --- a/api/pkg/observability/deployment/config.go +++ b/api/pkg/observability/deployment/config.go @@ -49,12 +49,6 @@ type ArizeSink struct { SpaceKey string `yaml:"space_key"` } -type BigQuerySink struct { - Project string `yaml:"project"` - Dataset string `yaml:"dataset"` - TTLDays int `yaml:"ttl_days"` -} - type MaxComputeSink struct { Project string `yaml:"project"` Dataset string `yaml:"dataset"` diff --git a/api/pkg/observability/deployment/deployment.go b/api/pkg/observability/deployment/deployment.go index 60f0905b2..d93d35ef2 100644 --- a/api/pkg/observability/deployment/deployment.go +++ b/api/pkg/observability/deployment/deployment.go @@ -262,16 +262,6 @@ func (c *deployer) applySecret(ctx context.Context, data *models.WorkerData) (se func (c *deployer) createSecretSpec(data *models.WorkerData) (*corev1.Secret, error) { observationSinks := []ObservationSink{} - if c.consumerConfig.BigQuerySink.Enabled { - observationSinks = append(observationSinks, ObservationSink{ - Type: BQ, - Config: BigQuerySink{ - Project: c.consumerConfig.BigQuerySink.Project, - Dataset: c.consumerConfig.BigQuerySink.Dataset, - TTLDays: c.consumerConfig.BigQuerySink.TTLDays, - }, - }) - } if c.consumerConfig.MaxComputeSink.Enabled { observationSinks = append(observationSinks, ObservationSink{ Type: MaxCompute, @@ -429,7 +419,7 @@ func (c *deployer) createDeploymentSpec(data *models.WorkerData, secretName stri }, }, } - podSpecWithIdentity := enrichIdentityToPod(podSpec, c.consumerConfig.ServiceAccountSecretName, []string{workerContainer}) + numReplicas := c.consumerConfig.Replicas if data.ResourceRequest != nil && data.ResourceRequest.Replica > 0 { numReplicas = data.ResourceRequest.Replica @@ -458,7 +448,7 @@ func (c *deployer) createDeploymentSpec(data *models.WorkerData, secretName stri PublisherRevisionAnnotationKey: strconv.Itoa(data.Revision), }, }, - Spec: podSpecWithIdentity, + Spec: podSpec, }, }, }, nil diff --git a/api/pkg/observability/deployment/deployment_test.go b/api/pkg/observability/deployment/deployment_test.go index 449c026fb..992fceded 100644 --- a/api/pkg/observability/deployment/deployment_test.go +++ b/api/pkg/observability/deployment/deployment_test.go @@ -102,11 +102,6 @@ func createDeploymentSpec(data *models.WorkerData, resourceRequest corev1.Resour MountPath: "/mlobs/observation-publisher/conf/environment", ReadOnly: true, }, - { - Name: "iam-secret", - MountPath: fmt.Sprintf("/iam/%s", serviceAccountSecretName), - ReadOnly: true, - }, }, Ports: []corev1.ContainerPort{ { @@ -115,12 +110,6 @@ func createDeploymentSpec(data *models.WorkerData, resourceRequest corev1.Resour Protocol: corev1.ProtocolTCP, }, }, - Env: []corev1.EnvVar{ - { - Name: "GOOGLE_APPLICATION_CREDENTIALS", - Value: fmt.Sprintf("/iam/%s/service-account.json", serviceAccountSecretName), - }, - }, }, }, Volumes: []corev1.Volume{ @@ -132,14 +121,6 @@ func createDeploymentSpec(data *models.WorkerData, resourceRequest corev1.Resour }, }, }, - { - Name: "iam-secret", - VolumeSource: corev1.VolumeSource{ - Secret: &corev1.SecretVolumeSource{ - SecretName: serviceAccountSecretName, - }, - }, - }, }, }, }, @@ -208,12 +189,6 @@ func Test_deployer_Deploy(t *testing.T) { SpaceKey: "space-key", EnabledModelSerials: "project-1_model-1", }, - BigQuerySink: config.BigQuerySink{ - Project: "bq-project", - Dataset: "dataset", - TTLDays: 10, - Enabled: true, - }, MaxComputeSink: config.MaxComputeSink{ Project: "max-project", Dataset: "dataset", @@ -329,7 +304,7 @@ func Test_deployer_Deploy(t *testing.T) { }, }, StringData: map[string]string{ - "config.yaml": "project: project-1\nmodel_id: model-1\nmodel_version: \"1\"\ninference_schema:\n session_id_column: session_id\n row_id_column: row_id\n model_prediction_output:\n actual_score_column: \"\"\n negative_class_label: negative\n prediction_score_column: prediction_score\n prediction_label_column: prediction_label\n positive_class_label: positive\n score_threshold: null\n output_class: BinaryClassificationOutput\n tag_columns:\n - tag\n feature_types:\n featureA: float64\n featureB: float64\n featureC: int64\n featureD: boolean\n feature_orders: []\nobservation_sinks:\n- type: BIGQUERY\n config:\n project: bq-project\n dataset: dataset\n ttl_days: 10\n- type: MAXCOMPUTE\n config:\n project: max-project\n dataset: dataset\n ttl_days: 0\n access_key_id: key\n access_key_secret: secret\n access_url: url\n- type: ARIZE\n config:\n api_key: api-key\n space_key: space-key\nobservation_source:\n type: KAFKA\n config:\n topic: caraml-project-1-model-1-1-prediction-log\n bootstrap_servers: broker-1\n group_id: group-id\n batch_size: 100\n additional_consumer_config:\n auto.offset.reset: latest\n fetch.min.bytes: \"1024000\"\n", + "config.yaml": "project: project-1\nmodel_id: model-1\nmodel_version: \"1\"\ninference_schema:\n session_id_column: session_id\n row_id_column: row_id\n model_prediction_output:\n actual_score_column: \"\"\n negative_class_label: negative\n prediction_score_column: prediction_score\n prediction_label_column: prediction_label\n positive_class_label: positive\n score_threshold: null\n output_class: BinaryClassificationOutput\n tag_columns:\n - tag\n feature_types:\n featureA: float64\n featureB: float64\n featureC: int64\n featureD: boolean\n feature_orders: []\nobservation_sinks:\n- type: MAXCOMPUTE\n config:\n project: max-project\n dataset: dataset\n ttl_days: 0\n access_key_id: key\n access_key_secret: secret\n access_url: url\n- type: ARIZE\n config:\n api_key: api-key\n space_key: space-key\nobservation_source:\n type: KAFKA\n config:\n topic: caraml-project-1-model-1-1-prediction-log\n bootstrap_servers: broker-1\n group_id: group-id\n batch_size: 100\n additional_consumer_config:\n auto.offset.reset: latest\n fetch.min.bytes: \"1024000\"\n", }, }}, nil, false) deploymentAPI := clientSet.AppsV1().Deployments(namespace).(*fakeappsv1.FakeDeployments) @@ -401,7 +376,7 @@ func Test_deployer_Deploy(t *testing.T) { }, }, StringData: map[string]string{ - "config.yaml": "project: project-1\nmodel_id: model-1\nmodel_version: \"1\"\ninference_schema:\n session_id_column: session_id\n row_id_column: row_id\n model_prediction_output:\n actual_score_column: \"\"\n negative_class_label: negative\n prediction_score_column: prediction_score\n prediction_label_column: prediction_label\n positive_class_label: positive\n score_threshold: null\n output_class: BinaryClassificationOutput\n tag_columns:\n - tag\n feature_types:\n featureA: float64\n featureB: float64\n featureC: int64\n featureD: boolean\n feature_orders: []\nobservation_sinks:\n- type: BIGQUERY\n config:\n project: bq-project\n dataset: dataset\n ttl_days: 10\n- type: MAXCOMPUTE\n config:\n project: max-project\n dataset: dataset\n ttl_days: 0\n access_key_id: key\n access_key_secret: secret\n access_url: url\n- type: ARIZE\n config:\n api_key: api-key\n space_key: space-key\nobservation_source:\n type: KAFKA\n config:\n topic: caraml-project-1-model-1-1-prediction-log\n bootstrap_servers: broker-1\n group_id: group-id\n batch_size: 100\n additional_consumer_config:\n auto.offset.reset: latest\n fetch.min.bytes: \"1024000\"\n", + "config.yaml": "project: project-1\nmodel_id: model-1\nmodel_version: \"1\"\ninference_schema:\n session_id_column: session_id\n row_id_column: row_id\n model_prediction_output:\n actual_score_column: \"\"\n negative_class_label: negative\n prediction_score_column: prediction_score\n prediction_label_column: prediction_label\n positive_class_label: positive\n score_threshold: null\n output_class: BinaryClassificationOutput\n tag_columns:\n - tag\n feature_types:\n featureA: float64\n featureB: float64\n featureC: int64\n featureD: boolean\n feature_orders: []\nobservation_sinks:\n- type: MAXCOMPUTE\n config:\n project: max-project\n dataset: dataset\n ttl_days: 0\n access_key_id: key\n access_key_secret: secret\n access_url: url\n- type: ARIZE\n config:\n api_key: api-key\n space_key: space-key\nobservation_source:\n type: KAFKA\n config:\n topic: caraml-project-1-model-1-1-prediction-log\n bootstrap_servers: broker-1\n group_id: group-id\n batch_size: 100\n additional_consumer_config:\n auto.offset.reset: latest\n fetch.min.bytes: \"1024000\"\n", }, }}, fmt.Errorf("deployment control plane is down"), false) return clientSet @@ -453,7 +428,7 @@ func Test_deployer_Deploy(t *testing.T) { }, }, StringData: map[string]string{ - "config.yaml": "project: project-1\nmodel_id: model-1\nmodel_version: \"1\"\ninference_schema:\n session_id_column: session_id\n row_id_column: row_id\n model_prediction_output:\n actual_score_column: \"\"\n negative_class_label: negative\n prediction_score_column: prediction_score\n prediction_label_column: prediction_label\n positive_class_label: positive\n score_threshold: null\n output_class: BinaryClassificationOutput\n tag_columns:\n - tag\n feature_types:\n featureA: float64\n featureB: float64\n featureC: int64\n featureD: boolean\n feature_orders: []\nobservation_sinks:\n- type: BIGQUERY\n config:\n project: bq-project\n dataset: dataset\n ttl_days: 10\n- type: MAXCOMPUTE\n config:\n project: max-project\n dataset: dataset\n ttl_days: 0\n access_key_id: key\n access_key_secret: secret\n access_url: url\n- type: ARIZE\n config:\n api_key: api-key\n space_key: space-key\nobservation_source:\n type: KAFKA\n config:\n topic: caraml-project-1-model-1-1-prediction-log\n bootstrap_servers: broker-1\n group_id: group-id\n batch_size: 100\n additional_consumer_config:\n auto.offset.reset: latest\n fetch.min.bytes: \"1024000\"\n", + "config.yaml": "project: project-1\nmodel_id: model-1\nmodel_version: \"1\"\ninference_schema:\n session_id_column: session_id\n row_id_column: row_id\n model_prediction_output:\n actual_score_column: \"\"\n negative_class_label: negative\n prediction_score_column: prediction_score\n prediction_label_column: prediction_label\n positive_class_label: positive\n score_threshold: null\n output_class: BinaryClassificationOutput\n tag_columns:\n - tag\n feature_types:\n featureA: float64\n featureB: float64\n featureC: int64\n featureD: boolean\n feature_orders: []\nobservation_sinks:\n- type: MAXCOMPUTE\n config:\n project: max-project\n dataset: dataset\n ttl_days: 0\n access_key_id: key\n access_key_secret: secret\n access_url: url\n- type: ARIZE\n config:\n api_key: api-key\n space_key: space-key\nobservation_source:\n type: KAFKA\n config:\n topic: caraml-project-1-model-1-1-prediction-log\n bootstrap_servers: broker-1\n group_id: group-id\n batch_size: 100\n additional_consumer_config:\n auto.offset.reset: latest\n fetch.min.bytes: \"1024000\"\n", }, }}, nil, false) deploymentAPI := clientSet.AppsV1().Deployments(namespace).(*fakeappsv1.FakeDeployments) @@ -521,7 +496,7 @@ func Test_deployer_Deploy(t *testing.T) { }, }, StringData: map[string]string{ - "config.yaml": "project: project-1\nmodel_id: model-1\nmodel_version: \"1\"\ninference_schema:\n session_id_column: session_id\n row_id_column: row_id\n model_prediction_output:\n actual_score_column: \"\"\n negative_class_label: negative\n prediction_score_column: prediction_score\n prediction_label_column: prediction_label\n positive_class_label: positive\n score_threshold: null\n output_class: BinaryClassificationOutput\n tag_columns:\n - tag\n feature_types:\n featureA: float64\n featureB: float64\n featureC: int64\n featureD: boolean\n feature_orders: []\nobservation_sinks:\n- type: BIGQUERY\n config:\n project: bq-project\n dataset: dataset\n ttl_days: 10\n- type: MAXCOMPUTE\n config:\n project: max-project\n dataset: dataset\n ttl_days: 0\n access_key_id: key\n access_key_secret: secret\n access_url: url\n- type: ARIZE\n config:\n api_key: api-key\n space_key: space-key\nobservation_source:\n type: KAFKA\n config:\n topic: caraml-project-1-model-1-1-prediction-log\n bootstrap_servers: broker-1\n group_id: group-id\n batch_size: 100\n additional_consumer_config:\n auto.offset.reset: latest\n fetch.min.bytes: \"1024000\"\n", + "config.yaml": "project: project-1\nmodel_id: model-1\nmodel_version: \"1\"\ninference_schema:\n session_id_column: session_id\n row_id_column: row_id\n model_prediction_output:\n actual_score_column: \"\"\n negative_class_label: negative\n prediction_score_column: prediction_score\n prediction_label_column: prediction_label\n positive_class_label: positive\n score_threshold: null\n output_class: BinaryClassificationOutput\n tag_columns:\n - tag\n feature_types:\n featureA: float64\n featureB: float64\n featureC: int64\n featureD: boolean\n feature_orders: []\nobservation_sinks:\n- type: MAXCOMPUTE\n config:\n project: max-project\n dataset: dataset\n ttl_days: 0\n access_key_id: key\n access_key_secret: secret\n access_url: url\n- type: ARIZE\n config:\n api_key: api-key\n space_key: space-key\nobservation_source:\n type: KAFKA\n config:\n topic: caraml-project-1-model-1-1-prediction-log\n bootstrap_servers: broker-1\n group_id: group-id\n batch_size: 100\n additional_consumer_config:\n auto.offset.reset: latest\n fetch.min.bytes: \"1024000\"\n", }, }, nil) prependUpsertSecretReactor(t, secretAPI, []*corev1.Secret{ @@ -539,7 +514,7 @@ func Test_deployer_Deploy(t *testing.T) { }, }, StringData: map[string]string{ - "config.yaml": "project: project-1\nmodel_id: model-1\nmodel_version: \"2\"\ninference_schema:\n session_id_column: session_id\n row_id_column: row_id\n model_prediction_output:\n actual_score_column: \"\"\n negative_class_label: negative\n prediction_score_column: prediction_score\n prediction_label_column: prediction_label\n positive_class_label: positive\n score_threshold: null\n output_class: BinaryClassificationOutput\n tag_columns:\n - tag\n feature_types:\n featureA: float64\n featureB: float64\n featureC: int64\n featureD: boolean\n feature_orders: []\nobservation_sinks:\n- type: BIGQUERY\n config:\n project: bq-project\n dataset: dataset\n ttl_days: 10\n- type: MAXCOMPUTE\n config:\n project: max-project\n dataset: dataset\n ttl_days: 0\n access_key_id: key\n access_key_secret: secret\n access_url: url\n- type: ARIZE\n config:\n api_key: api-key\n space_key: space-key\nobservation_source:\n type: KAFKA\n config:\n topic: caraml-project-1-model-1-2-prediction-log\n bootstrap_servers: broker-1\n group_id: group-id\n batch_size: 100\n additional_consumer_config:\n auto.offset.reset: latest\n fetch.min.bytes: \"1024000\"\n", + "config.yaml": "project: project-1\nmodel_id: model-1\nmodel_version: \"2\"\ninference_schema:\n session_id_column: session_id\n row_id_column: row_id\n model_prediction_output:\n actual_score_column: \"\"\n negative_class_label: negative\n prediction_score_column: prediction_score\n prediction_label_column: prediction_label\n positive_class_label: positive\n score_threshold: null\n output_class: BinaryClassificationOutput\n tag_columns:\n - tag\n feature_types:\n featureA: float64\n featureB: float64\n featureC: int64\n featureD: boolean\n feature_orders: []\nobservation_sinks:\n- type: MAXCOMPUTE\n config:\n project: max-project\n dataset: dataset\n ttl_days: 0\n access_key_id: key\n access_key_secret: secret\n access_url: url\n- type: ARIZE\n config:\n api_key: api-key\n space_key: space-key\nobservation_source:\n type: KAFKA\n config:\n topic: caraml-project-1-model-1-2-prediction-log\n bootstrap_servers: broker-1\n group_id: group-id\n batch_size: 100\n additional_consumer_config:\n auto.offset.reset: latest\n fetch.min.bytes: \"1024000\"\n", }, }}, nil, true) deploymentAPI := clientSet.AppsV1().Deployments(namespace).(*fakeappsv1.FakeDeployments) @@ -609,7 +584,7 @@ func Test_deployer_Deploy(t *testing.T) { }, }, StringData: map[string]string{ - "config.yaml": "model_id: model-1\nmodel_version: \"1\"\ninference_schema:\n session_id_column: session_id\n row_id_column: row_id\n model_prediction_output:\n actual_score_column: \"\"\n negative_class_label: negative\n prediction_score_column: prediction_score\n prediction_label_column: prediction_label\n positive_class_label: positive\n score_threshold: null\n output_class: BinaryClassificationOutput\n tag_columns:\n - tag\n feature_types:\n featureA: float64\n featureB: float64\n featureC: int64\n featureD: boolean\n feature_orders: []\nobservation_sinks:\n- type: BIGQUERY\n config:\n project: bq-project\n dataset: dataset\n ttl_days: 10\n- type: MAXCOMPUTE\n config:\n project: max-project\n dataset: dataset\n ttl_days: 0\n access_key_id: key\n access_key_secret: secret\n access_url: url\n- type: ARIZE\n config:\n api_key: api-key\n space_key: space-key\nobservation_source:\n type: KAFKA\n config:\n topic: caraml-project-1-model-1-1-prediction-log\n bootstrap_servers: broker-1\n group_id: group-id\n batch_size: 100\n additional_consumer_config:\n auto.offset.reset: latest\n fetch.min.bytes: \"1024000\"\n", + "config.yaml": "model_id: model-1\nmodel_version: \"1\"\ninference_schema:\n session_id_column: session_id\n row_id_column: row_id\n model_prediction_output:\n actual_score_column: \"\"\n negative_class_label: negative\n prediction_score_column: prediction_score\n prediction_label_column: prediction_label\n positive_class_label: positive\n score_threshold: null\n output_class: BinaryClassificationOutput\n tag_columns:\n - tag\n feature_types:\n featureA: float64\n featureB: float64\n featureC: int64\n featureD: boolean\n feature_orders: []\nobservation_sinks:\n- type: MAXCOMPUTE\n config:\n project: max-project\n dataset: dataset\n ttl_days: 0\n access_key_id: key\n access_key_secret: secret\n access_url: url\n- type: ARIZE\n config:\n api_key: api-key\n space_key: space-key\nobservation_source:\n type: KAFKA\n config:\n topic: caraml-project-1-model-1-1-prediction-log\n bootstrap_servers: broker-1\n group_id: group-id\n batch_size: 100\n additional_consumer_config:\n auto.offset.reset: latest\n fetch.min.bytes: \"1024000\"\n", }, }, nil) prependUpsertSecretReactor(t, secretAPI, []*corev1.Secret{ @@ -627,7 +602,7 @@ func Test_deployer_Deploy(t *testing.T) { }, }, StringData: map[string]string{ - "config.yaml": "project: project-1\nmodel_id: model-1\nmodel_version: \"2\"\ninference_schema:\n session_id_column: session_id\n row_id_column: row_id\n model_prediction_output:\n actual_score_column: \"\"\n negative_class_label: negative\n prediction_score_column: prediction_score\n prediction_label_column: prediction_label\n positive_class_label: positive\n score_threshold: null\n output_class: BinaryClassificationOutput\n tag_columns:\n - tag\n feature_types:\n featureA: float64\n featureB: float64\n featureC: int64\n featureD: boolean\n feature_orders: []\nobservation_sinks:\n- type: BIGQUERY\n config:\n project: bq-project\n dataset: dataset\n ttl_days: 10\n- type: MAXCOMPUTE\n config:\n project: max-project\n dataset: dataset\n ttl_days: 0\n access_key_id: key\n access_key_secret: secret\n access_url: url\n- type: ARIZE\n config:\n api_key: api-key\n space_key: space-key\nobservation_source:\n type: KAFKA\n config:\n topic: caraml-project-1-model-1-2-prediction-log\n bootstrap_servers: broker-1\n group_id: group-id\n batch_size: 100\n additional_consumer_config:\n auto.offset.reset: latest\n fetch.min.bytes: \"1024000\"\n", + "config.yaml": "project: project-1\nmodel_id: model-1\nmodel_version: \"2\"\ninference_schema:\n session_id_column: session_id\n row_id_column: row_id\n model_prediction_output:\n actual_score_column: \"\"\n negative_class_label: negative\n prediction_score_column: prediction_score\n prediction_label_column: prediction_label\n positive_class_label: positive\n score_threshold: null\n output_class: BinaryClassificationOutput\n tag_columns:\n - tag\n feature_types:\n featureA: float64\n featureB: float64\n featureC: int64\n featureD: boolean\n feature_orders: []\nobservation_sinks:\n- type: MAXCOMPUTE\n config:\n project: max-project\n dataset: dataset\n ttl_days: 0\n access_key_id: key\n access_key_secret: secret\n access_url: url\n- type: ARIZE\n config:\n api_key: api-key\n space_key: space-key\nobservation_source:\n type: KAFKA\n config:\n topic: caraml-project-1-model-1-2-prediction-log\n bootstrap_servers: broker-1\n group_id: group-id\n batch_size: 100\n additional_consumer_config:\n auto.offset.reset: latest\n fetch.min.bytes: \"1024000\"\n", }, }, { ObjectMeta: metav1.ObjectMeta{ @@ -643,7 +618,7 @@ func Test_deployer_Deploy(t *testing.T) { }, }, StringData: map[string]string{ - "config.yaml": "model_id: model-1\nmodel_version: \"1\"\ninference_schema:\n session_id_column: session_id\n row_id_column: row_id\n model_prediction_output:\n actual_score_column: \"\"\n negative_class_label: negative\n prediction_score_column: prediction_score\n prediction_label_column: prediction_label\n positive_class_label: positive\n score_threshold: null\n output_class: BinaryClassificationOutput\n tag_columns:\n - tag\n feature_types:\n featureA: float64\n featureB: float64\n featureC: int64\n featureD: boolean\n feature_orders: []\nobservation_sinks:\n- type: BIGQUERY\n config:\n project: bq-project\n dataset: dataset\n ttl_days: 10\n- type: MAXCOMPUTE\n config:\n project: max-project\n dataset: dataset\n ttl_days: 0\n access_key_id: key\n access_key_secret: secret\n access_url: url\n- type: ARIZE\n config:\n api_key: api-key\n space_key: space-key\nobservation_source:\n type: KAFKA\n config:\n topic: caraml-project-1-model-1-1-prediction-log\n bootstrap_servers: broker-1\n group_id: group-id\n batch_size: 100\n additional_consumer_config:\n auto.offset.reset: latest\n fetch.min.bytes: \"1024000\"\n", + "config.yaml": "model_id: model-1\nmodel_version: \"1\"\ninference_schema:\n session_id_column: session_id\n row_id_column: row_id\n model_prediction_output:\n actual_score_column: \"\"\n negative_class_label: negative\n prediction_score_column: prediction_score\n prediction_label_column: prediction_label\n positive_class_label: positive\n score_threshold: null\n output_class: BinaryClassificationOutput\n tag_columns:\n - tag\n feature_types:\n featureA: float64\n featureB: float64\n featureC: int64\n featureD: boolean\n feature_orders: []\nobservation_sinks:\n- type: MAXCOMPUTE\n config:\n project: max-project\n dataset: dataset\n ttl_days: 0\n access_key_id: key\n access_key_secret: secret\n access_url: url\n- type: ARIZE\n config:\n api_key: api-key\n space_key: space-key\nobservation_source:\n type: KAFKA\n config:\n topic: caraml-project-1-model-1-1-prediction-log\n bootstrap_servers: broker-1\n group_id: group-id\n batch_size: 100\n additional_consumer_config:\n auto.offset.reset: latest\n fetch.min.bytes: \"1024000\"\n", }, }, }, nil, true) @@ -822,7 +797,7 @@ func Test_deployer_GetDeployedManifest(t *testing.T) { }, }, StringData: map[string]string{ - "config.yaml": "model_id: model-1\nmodel_version: \"1\"\ninference_schema:\n session_id_column: session_id\n row_id_column: row_id\n model_prediction_output:\n actual_score_column: \"\"\n negative_class_label: negative\n prediction_score_column: prediction_score\n prediction_label_column: prediction_label\n positive_class_label: positive\n score_threshold: null\n output_class: BinaryClassificationOutput\n tag_columns:\n - tag\n feature_types:\n featureA: float64\n featureB: float64\n featureC: int64\n featureD: boolean\n feature_orders: []\nobservation_sinks:\n- type: BIGQUERY\n config:\n project: bq-project\n dataset: dataset\n ttl_days: 10\n- type: ARIZE\n config:\n api_key: api-key\n space_key: space-key\n- type: MAXCOMPUTE\n config:\n project: max-project\n dataset: dataset\n ttl_days: 0\n access_key_id: key\n access_key_secret: secret\n access_url: url\nobservation_source:\n type: KAFKA\n config:\n topic: caraml-project-1-model-1-1-prediction-log\n bootstrap_servers: broker-1\n group_id: group-id\n batch_size: 100\n additional_consumer_config:\n auto.offset.reset: latest\n fetch.min.bytes: \"1024000\"\n", + "config.yaml": "model_id: model-1\nmodel_version: \"1\"\ninference_schema:\n session_id_column: session_id\n row_id_column: row_id\n model_prediction_output:\n actual_score_column: \"\"\n negative_class_label: negative\n prediction_score_column: prediction_score\n prediction_label_column: prediction_label\n positive_class_label: positive\n score_threshold: null\n output_class: BinaryClassificationOutput\n tag_columns:\n - tag\n feature_types:\n featureA: float64\n featureB: float64\n featureC: int64\n featureD: boolean\n feature_orders: []\nobservation_sinks:\n- type: ARIZE\n config:\n api_key: api-key\n space_key: space-key\n- type: MAXCOMPUTE\n config:\n project: max-project\n dataset: dataset\n ttl_days: 0\n access_key_id: key\n access_key_secret: secret\n access_url: url\nobservation_source:\n type: KAFKA\n config:\n topic: caraml-project-1-model-1-1-prediction-log\n bootstrap_servers: broker-1\n group_id: group-id\n batch_size: 100\n additional_consumer_config:\n auto.offset.reset: latest\n fetch.min.bytes: \"1024000\"\n", }, } workerData := &models.WorkerData{ diff --git a/api/pkg/observability/deployment/identity.go b/api/pkg/observability/deployment/identity.go deleted file mode 100644 index 9f6c01727..000000000 --- a/api/pkg/observability/deployment/identity.go +++ /dev/null @@ -1,55 +0,0 @@ -package deployment - -import ( - "fmt" - - corev1 "k8s.io/api/core/v1" -) - -func enrichIdentityToPod(podSpec corev1.PodSpec, secretName string, containerNames []string) corev1.PodSpec { - secretVolume := createVolumeFromSecret(secretName) - updatedPodSpec := podSpec.DeepCopy() - - containerExist := false - containerNameLookup := make(map[string]bool) - for _, containerName := range containerNames { - containerNameLookup[containerName] = true - } - - for idx, containerSpec := range updatedPodSpec.Containers { - if val := containerNameLookup[containerSpec.Name]; !val { - continue - } - - containerExist = true - mountPath := fmt.Sprintf("/iam/%s", secretName) - volumeMount := corev1.VolumeMount{ - Name: secretVolume.Name, - MountPath: mountPath, - ReadOnly: true, - } - containerSpec.VolumeMounts = append(containerSpec.VolumeMounts, volumeMount) - gcpCredentialEnvVar := corev1.EnvVar{ - Name: "GOOGLE_APPLICATION_CREDENTIALS", - Value: fmt.Sprintf("%s/service-account.json", mountPath), - } - containerSpec.Env = append(containerSpec.Env, gcpCredentialEnvVar) - updatedPodSpec.Containers[idx] = containerSpec - } - - if containerExist { - updatedPodSpec.Volumes = append(updatedPodSpec.Volumes, secretVolume) - } - return *updatedPodSpec -} - -func createVolumeFromSecret(secretName string) corev1.Volume { - return corev1.Volume{ - Name: "iam-secret", - VolumeSource: corev1.VolumeSource{ - Secret: &corev1.SecretVolumeSource{ - SecretName: secretName, - }, - }, - } -} diff --git a/api/pkg/observability/deployment/identity_test.go b/api/pkg/observability/deployment/identity_test.go deleted file mode 100644 index f4d2fdf7a..000000000 --- a/api/pkg/observability/deployment/identity_test.go +++ /dev/null @@ -1,425 +0,0 @@ -package deployment - -import ( - "testing" - - "github.com/stretchr/testify/assert" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" -) - -func Test_gcp_enrichIdentityToPod(t *testing.T) { - singleContainerPodSpec := corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "worker", - Image: "python:3.10", - Command: []string{ - "python", - "-m", - "publisher", - "+environment=config", - }, - ImagePullPolicy: corev1.PullIfNotPresent, - Resources: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1"), - corev1.ResourceMemory: resource.MustParse("1Gi"), - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1"), - corev1.ResourceMemory: resource.MustParse("1Gi"), - }, - }, - Ports: []corev1.ContainerPort{ - { - Name: "prom-metric", - ContainerPort: 8000, - Protocol: corev1.ProtocolTCP, - }, - }, - }, - }, - } - - multipleContainerPodSpec := corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "worker", - Image: "python:3.10", - Command: []string{ - "python", - "-m", - "publisher", - "+environment=config", - }, - ImagePullPolicy: corev1.PullIfNotPresent, - Resources: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1"), - corev1.ResourceMemory: resource.MustParse("1Gi"), - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1"), - corev1.ResourceMemory: resource.MustParse("1Gi"), - }, - }, - VolumeMounts: []corev1.VolumeMount{ - { - Name: "env-secret-volume", - MountPath: "/env", - ReadOnly: true, - }, - }, - Ports: []corev1.ContainerPort{ - { - Name: "prom-metric", - ContainerPort: 8000, - Protocol: corev1.ProtocolTCP, - }, - }, - Env: []corev1.EnvVar{ - { - Name: "WORKERS", - Value: "1", - }, - }, - }, - { - Name: "sidecar", - Image: "python:3.10", - Command: []string{ - "./run.sh", - }, - ImagePullPolicy: corev1.PullIfNotPresent, - Resources: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1"), - corev1.ResourceMemory: resource.MustParse("1Gi"), - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1"), - corev1.ResourceMemory: resource.MustParse("1Gi"), - }, - }, - }, - }, - Volumes: []corev1.Volume{ - { - Name: "env-secret-volume", - VolumeSource: corev1.VolumeSource{ - Secret: &corev1.SecretVolumeSource{ - SecretName: "secret-1", - }, - }, - }, - }, - } - type args struct { - podSpec corev1.PodSpec - secretName string - containerNames []string - } - tests := []struct { - name string - args args - want corev1.PodSpec - }{ - { - name: "update identity for one container", - args: args{ - podSpec: singleContainerPodSpec, - secretName: "service-account", - containerNames: []string{"worker"}, - }, - want: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "worker", - Image: "python:3.10", - Command: []string{ - "python", - "-m", - "publisher", - "+environment=config", - }, - ImagePullPolicy: corev1.PullIfNotPresent, - Resources: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1"), - corev1.ResourceMemory: resource.MustParse("1Gi"), - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1"), - corev1.ResourceMemory: resource.MustParse("1Gi"), - }, - }, - Ports: []corev1.ContainerPort{ - { - Name: "prom-metric", - ContainerPort: 8000, - Protocol: corev1.ProtocolTCP, - }, - }, - VolumeMounts: []corev1.VolumeMount{ - { - Name: "iam-secret", - MountPath: "/iam/service-account", - ReadOnly: true, - }, - }, - Env: []corev1.EnvVar{ - { - Name: "GOOGLE_APPLICATION_CREDENTIALS", - Value: "/iam/service-account/service-account.json", - }, - }, - }, - }, - Volumes: []corev1.Volume{ - { - Name: "iam-secret", - VolumeSource: corev1.VolumeSource{ - Secret: &corev1.SecretVolumeSource{ - SecretName: "service-account", - }, - }, - }, - }, - }, - }, - { - name: "update identity for one container, but no matching container name", - args: args{ - podSpec: singleContainerPodSpec, - secretName: "service-account", - containerNames: []string{"not-exist"}, - }, - want: singleContainerPodSpec, - }, - { - name: "update identity for multiple containers", - args: args{ - podSpec: multipleContainerPodSpec, - secretName: "service-account", - containerNames: []string{"worker", "sidecar"}, - }, - want: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "worker", - Image: "python:3.10", - Command: []string{ - "python", - "-m", - "publisher", - "+environment=config", - }, - ImagePullPolicy: corev1.PullIfNotPresent, - Resources: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1"), - corev1.ResourceMemory: resource.MustParse("1Gi"), - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1"), - corev1.ResourceMemory: resource.MustParse("1Gi"), - }, - }, - VolumeMounts: []corev1.VolumeMount{ - { - Name: "env-secret-volume", - MountPath: "/env", - ReadOnly: true, - }, - { - Name: "iam-secret", - MountPath: "/iam/service-account", - ReadOnly: true, - }, - }, - Env: []corev1.EnvVar{ - { - Name: "WORKERS", - Value: "1", - }, - { - Name: "GOOGLE_APPLICATION_CREDENTIALS", - Value: "/iam/service-account/service-account.json", - }, - }, - Ports: []corev1.ContainerPort{ - { - Name: "prom-metric", - ContainerPort: 8000, - Protocol: corev1.ProtocolTCP, - }, - }, - }, - { - Name: "sidecar", - Image: "python:3.10", - Command: []string{ - "./run.sh", - }, - ImagePullPolicy: corev1.PullIfNotPresent, - Resources: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1"), - corev1.ResourceMemory: resource.MustParse("1Gi"), - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1"), - corev1.ResourceMemory: resource.MustParse("1Gi"), - }, - }, - VolumeMounts: []corev1.VolumeMount{ - { - Name: "iam-secret", - MountPath: "/iam/service-account", - ReadOnly: true, - }, - }, - Env: []corev1.EnvVar{ - { - Name: "GOOGLE_APPLICATION_CREDENTIALS", - Value: "/iam/service-account/service-account.json", - }, - }, - }, - }, - Volumes: []corev1.Volume{ - { - Name: "env-secret-volume", - VolumeSource: corev1.VolumeSource{ - Secret: &corev1.SecretVolumeSource{ - SecretName: "secret-1", - }, - }, - }, - { - Name: "iam-secret", - VolumeSource: corev1.VolumeSource{ - Secret: &corev1.SecretVolumeSource{ - SecretName: "service-account", - }, - }, - }, - }, - }, - }, - { - name: "update identity for multiple containers, only match one container", - args: args{ - podSpec: multipleContainerPodSpec, - secretName: "service-account", - containerNames: []string{"worker"}, - }, - want: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "worker", - Image: "python:3.10", - Command: []string{ - "python", - "-m", - "publisher", - "+environment=config", - }, - ImagePullPolicy: corev1.PullIfNotPresent, - Resources: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1"), - corev1.ResourceMemory: resource.MustParse("1Gi"), - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1"), - corev1.ResourceMemory: resource.MustParse("1Gi"), - }, - }, - VolumeMounts: []corev1.VolumeMount{ - { - Name: "env-secret-volume", - MountPath: "/env", - ReadOnly: true, - }, - { - Name: "iam-secret", - MountPath: "/iam/service-account", - ReadOnly: true, - }, - }, - Env: []corev1.EnvVar{ - { - Name: "WORKERS", - Value: "1", - }, - { - Name: "GOOGLE_APPLICATION_CREDENTIALS", - Value: "/iam/service-account/service-account.json", - }, - }, - Ports: []corev1.ContainerPort{ - { - Name: "prom-metric", - ContainerPort: 8000, - Protocol: corev1.ProtocolTCP, - }, - }, - }, - { - Name: "sidecar", - Image: "python:3.10", - Command: []string{ - "./run.sh", - }, - ImagePullPolicy: corev1.PullIfNotPresent, - Resources: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1"), - corev1.ResourceMemory: resource.MustParse("1Gi"), - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1"), - corev1.ResourceMemory: resource.MustParse("1Gi"), - }, - }, - }, - }, - Volumes: []corev1.Volume{ - { - Name: "env-secret-volume", - VolumeSource: corev1.VolumeSource{ - Secret: &corev1.SecretVolumeSource{ - SecretName: "secret-1", - }, - }, - }, - { - Name: "iam-secret", - VolumeSource: corev1.VolumeSource{ - Secret: &corev1.SecretVolumeSource{ - SecretName: "service-account", - }, - }, - }, - }, - }, - }, - { - name: "update identity for multiple containers, but none matching container name", - args: args{ - podSpec: multipleContainerPodSpec, - secretName: "service-account", - containerNames: []string{"worker-1", "sidecar-1"}, - }, - want: multipleContainerPodSpec, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got := enrichIdentityToPod(tt.args.podSpec, tt.args.secretName, tt.args.containerNames) - assert.Equal(t, tt.want, got) - }) - } -} diff --git a/python/observation-publisher/publisher/observation_sink.py b/python/observation-publisher/publisher/observation_sink.py index a6b798077..5d173a412 100644 --- a/python/observation-publisher/publisher/observation_sink.py +++ b/python/observation-publisher/publisher/observation_sink.py @@ -16,10 +16,6 @@ from arize.utils.types import Environments from arize.utils.types import ModelTypes as ArizeModelType from dataclasses_json import dataclass_json -from google.api_core.exceptions import NotFound -from google.cloud.bigquery import Client as BigQueryClient -from google.cloud.bigquery import (SchemaField, Table, TimePartitioning, - TimePartitioningType) from merlin.observability.inference import (BinaryClassificationOutput, InferenceSchema, RankingOutput, RegressionOutput, @@ -143,183 +139,6 @@ def write(self, df: pd.DataFrame): print(f"Failed to log to Arize: {e}") raise e - -@dataclass_json -@dataclass -class BigQueryRetryConfig: - """ - Configuration for retrying failed write attempts. Write could fail due to BigQuery - taking time to update the table schema / create new table. - Attributes: - enabled: Whether to retry failed write attempts - retry_attempts: Number of retry attempts - retry_interval_seconds: Interval between retry attempts - """ - - enabled: bool = False - retry_attempts: int = 4 - retry_interval_seconds: int = 30 - - -@dataclass_json -@dataclass -class BigQueryConfig: - """ - Configuration for writing to BigQuery - Attributes: - project: GCP project id - dataset: BigQuery dataset name - ttl_days: Time to live for the date partition - retry: Configuration for retrying failed write attempts - """ - - project: str - dataset: str - ttl_days: int - retry: BigQueryRetryConfig = BigQueryRetryConfig() - - -class BigQuerySink(ObservationSink): - """ - Writes prediction logs to BigQuery. If the destination table doesn't exist, it will be created based on the inference schema. - """ - - def __init__( - self, - project: str, - inference_schema: InferenceSchema, - model_id: str, - model_version: str, - config: BigQueryConfig, - ): - """ - :param project: CaraML project - :param inference_schema: Inference schema for the ingested model - :param model_id: Merlin model id - :param model_version: Merlin model version - :param config: Configuration to write to bigquery sink - """ - super().__init__(project, inference_schema, model_id, model_version) - self._client = BigQueryClient() - self._config = config - self._table = self.create_or_update_table() - - @property - def bq_project(self) -> str: - return self._config.project - - @property - def dataset(self) -> str: - return self._config.dataset - - @property - def retry(self) -> BigQueryRetryConfig: - return self._config.retry - - def create_or_update_table(self) -> Table: - try: - original_table = self._client.get_table(self.write_location) - original_schema = original_table.schema - migrated_schema = original_schema[:] - for field in self.schema_fields: - if field not in original_schema: - migrated_schema.append(field) - if migrated_schema == original_schema: - return original_table - original_table.schema = migrated_schema - return self._client.update_table(original_table, ["schema"]) - except NotFound: - table = Table(self.write_location, schema=self.schema_fields) - table.time_partitioning = TimePartitioning( - type_=TimePartitioningType.DAY, - field=PREDICTION_LOG_TIMESTAMP_COLUMN, - expiration_ms=self._config.ttl_days * 24 * 60 * 60 * 1000, - ) - return self._client.create_table(table=table) - - @property - def schema_fields(self) -> List[SchemaField]: - value_type_to_bq_type = { - ValueType.INT64: "INTEGER", - ValueType.FLOAT64: "FLOAT", - ValueType.BOOLEAN: "BOOLEAN", - ValueType.STRING: "STRING", - } - - schema_fields = [ - SchemaField( - name=self._inference_schema.session_id_column, - field_type="STRING", - ), - SchemaField( - name=self._inference_schema.row_id_column, - field_type="STRING", - ), - SchemaField( - name=PREDICTION_LOG_TIMESTAMP_COLUMN, - field_type="TIMESTAMP", - ), - SchemaField( - name=PREDICTION_LOG_MODEL_VERSION_COLUMN, - field_type="STRING", - ), - ] - for feature, feature_type in self._inference_schema.feature_types.items(): - schema_fields.append( - SchemaField( - name=feature, field_type=value_type_to_bq_type[feature_type] - ) - ) - for ( - prediction, - prediction_type, - ) in self._inference_schema.model_prediction_output.prediction_types().items(): - schema_fields.append( - SchemaField( - name=prediction, field_type=value_type_to_bq_type[prediction_type] - ) - ) - - return schema_fields - - @property - def write_location(self) -> str: - """ - Returns the BigQuery table location to write the prediction logs, which will be unique - for each CaraML project / model pair. Different versions of a model share the same table. - :return: - """ - table_name = f"prediction_log_{self._project}_{self._model_id}".replace("-", "_").replace( - ".", "_" - ) - return f"{self.bq_project}.{self.dataset}.{table_name}" - - def write(self, dataframe: pd.DataFrame): - for i in range(0, self.retry.retry_attempts + 1): - try: - response = self._client.insert_rows_from_dataframe( - dataframe=dataframe, table=self._table - ) - errors = [error for error_chunk in response for error in error_chunk] - if len(errors) > 0: - if not self.retry.enabled: - print("Errors when inserting rows to BigQuery") - return - else: - print( - f"Errors when inserting rows to BigQuery, retrying attempt {i}/{self.retry.retry_attempts}" - ) - time.sleep(self.retry.retry_interval_seconds) - else: - return - except NotFound as e: - print( - f"Table not found: {e}, retrying attempt {i}/{self.retry.retry_attempts}" - ) - time.sleep(self.retry.retry_interval_seconds) - print(f"Failed to write to BigQuery after {self.retry.retry_attempts} attempts") - - @dataclass_json @dataclass class MaxComputeRetryConfig: @@ -409,7 +228,7 @@ def _get_column_values_for_query(self, schema_fields) -> str: column_queries.append(str(column.name) + " " + str(column.type)) return ",".join(column_queries) - def create_or_update_table(self) -> Table: + def create_or_update_table(self): try: original_table = self._client.get_table(self.write_location) original_schema = original_table.table_schema @@ -553,16 +372,6 @@ def new_observation_sink( model_version: str, ) -> ObservationSink: match sink_config.type: - case ObservationSinkType.BIGQUERY: - bq_config: BigQueryConfig = BigQueryConfig.from_dict(sink_config.config) # type: ignore[attr-defined] - - return BigQuerySink( - project=project, - inference_schema=inference_schema, - model_id=model_id, - model_version=model_version, - config=bq_config, - ) case ObservationSinkType.ARIZE: arize_config: ArizeConfig = ArizeConfig.from_dict(sink_config.config) # type: ignore[attr-defined] client = ArizeClient( diff --git a/python/observation-publisher/requirements.in b/python/observation-publisher/requirements.in index f81865b3b..d3ae0db51 100644 --- a/python/observation-publisher/requirements.in +++ b/python/observation-publisher/requirements.in @@ -3,7 +3,6 @@ caraml-upi-protos>=1.0.0 arize>=7.7.0 hydra-core>=1.3.0 pandas>=1.0.0 -google-cloud-bigquery prometheus-client >= 0.19.0 typing-extensions==4.9.0 pyodps==0.12.2 diff --git a/python/observation-publisher/requirements.txt b/python/observation-publisher/requirements.txt index d614dcad0..41f2dae1d 100644 --- a/python/observation-publisher/requirements.txt +++ b/python/observation-publisher/requirements.txt @@ -6,302 +6,79 @@ # -e file:../sdk # via -r requirements.in -alembic==1.13.1 - # via mlflow -annotated-types==0.6.0 - # via pydantic antlr4-python3-runtime==4.9.3 # via # hydra-core # omegaconf arize==7.10.2 # via -r requirements.in -arrow==1.3.0 - # via cookiecutter -binaryornot==0.4.4 - # via cookiecutter -blinker==1.7.0 - # via flask -boto3==1.35.39 - # via merlin-sdk -botocore==1.35.39 - # via - # boto3 - # s3transfer -cachetools==5.3.2 - # via google-auth -caraml-auth-google==0.0.0.post9 - # via merlin-sdk caraml-upi-protos==1.0.0 - # via - # -r requirements.in - # merlin-sdk + # via -r requirements.in certifi==2024.2.2 - # via - # merlin-sdk - # requests -chardet==5.2.0 - # via binaryornot + # via requests charset-normalizer==3.3.2 # via requests -click==8.1.3 - # via - # cookiecutter - # databricks-cli - # flask - # merlin-sdk - # mlflow -cloudpickle==2.0.0 - # via - # merlin-sdk - # mlflow confluent-kafka==2.3.0 # via -r requirements.in -cookiecutter==2.5.0 - # via merlin-sdk -databricks-cli==0.18.0 - # via mlflow -dataclasses-json==0.6.4 - # via merlin-sdk -docker==6.1.3 - # via - # merlin-sdk - # mlflow -entrypoints==0.4 - # via mlflow -flask==3.0.2 - # via - # mlflow - # prometheus-flask-exporter -gitdb==4.0.11 - # via gitpython -gitpython==3.1.42 - # via - # merlin-sdk - # mlflow -google-api-core==2.17.1 - # via - # google-cloud-bigquery - # google-cloud-core - # google-cloud-storage -google-auth==2.28.0 - # via - # caraml-auth-google - # google-api-core - # google-cloud-core - # google-cloud-storage -google-cloud-bigquery==3.17.2 - # via -r requirements.in -google-cloud-core==2.4.1 - # via - # google-cloud-bigquery - # google-cloud-storage -google-cloud-storage==2.14.0 - # via merlin-sdk -google-crc32c==1.5.0 - # via - # google-cloud-storage - # google-resumable-media -google-resumable-media==2.7.0 - # via - # google-cloud-bigquery - # google-cloud-storage googleapis-common-protos==1.62.0 # via # arize # caraml-upi-protos - # google-api-core grpcio==1.60.1 - # via - # grpcio-tools - # merlin-sdk + # via grpcio-tools grpcio-tools==1.60.1 - # via - # caraml-upi-protos - # merlin-sdk -gunicorn==21.2.0 - # via mlflow + # via caraml-upi-protos hydra-core==1.3.2 # via -r requirements.in idna==3.6 # via requests -importlib-metadata==7.0.1 - # via mlflow -itsdangerous==2.1.2 - # via flask -jinja2==3.1.3 - # via - # cookiecutter - # flask -jmespath==1.0.1 - # via - # boto3 - # botocore -mako==1.3.2 - # via alembic -markdown-it-py==3.0.0 - # via rich -markupsafe==2.1.5 - # via - # jinja2 - # mako - # werkzeug -marshmallow==3.20.2 - # via dataclasses-json -mdurl==0.1.2 - # via markdown-it-py -mlflow==1.26.1 - # via merlin-sdk -mypy-extensions==1.0.0 - # via typing-inspect numpy==1.26.4 # via - # merlin-sdk - # mlflow # pandas # pyarrow - # scipy -oauthlib==3.2.2 - # via databricks-cli omegaconf==2.3.0 # via hydra-core packaging==23.2 - # via - # docker - # google-cloud-bigquery - # gunicorn - # hydra-core - # marshmallow - # mlflow + # via hydra-core pandas==2.2.0 # via # -r requirements.in # arize - # mlflow prometheus-client==0.20.0 - # via - # -r requirements.in - # prometheus-flask-exporter -prometheus-flask-exporter==0.23.0 - # via mlflow + # via -r requirements.in protobuf==4.25.3 # via # arize - # google-api-core # googleapis-common-protos # grpcio-tools - # merlin-sdk - # mlflow pyarrow==15.0.0 # via # arize # pyodps -pyasn1==0.5.1 - # via - # pyasn1-modules - # rsa -pyasn1-modules==0.3.0 - # via google-auth -pydantic==2.5.3 - # via merlin-sdk -pydantic-core==2.14.6 - # via pydantic -pygments==2.17.2 - # via rich -pyjwt==2.8.0 - # via databricks-cli pyodps==0.12.2 # via -r requirements.in -pyprind==2.11.3 - # via merlin-sdk python-dateutil==2.8.2 - # via - # arrow - # botocore - # google-cloud-bigquery - # merlin-sdk - # pandas -python-slugify==8.0.4 - # via cookiecutter + # via pandas pytz==2022.7.1 - # via - # mlflow - # pandas + # via pandas pyyaml==6.0.1 - # via - # cookiecutter - # merlin-sdk - # mlflow - # omegaconf -querystring-parser==1.2.4 - # via mlflow + # via omegaconf requests==2.31.0 # via - # cookiecutter - # databricks-cli - # docker - # google-api-core - # google-cloud-bigquery - # google-cloud-storage - # mlflow # pyodps # requests-futures requests-futures==1.0.0 # via arize -rich==13.7.0 - # via cookiecutter -rsa==4.9 - # via google-auth -s3transfer==0.10.0 - # via boto3 -scipy==1.12.0 - # via mlflow six==1.16.0 - # via - # databricks-cli - # merlin-sdk - # python-dateutil - # querystring-parser -smmap==5.0.1 - # via gitdb -sqlalchemy==1.4.54 - # via - # alembic - # mlflow -sqlparse==0.4.4 - # via mlflow -tabulate==0.9.0 - # via databricks-cli -text-unidecode==1.3 - # via python-slugify + # via python-dateutil tqdm==4.66.2 # via arize -types-python-dateutil==2.8.19.20240106 - # via arrow typing-extensions==4.9.0 - # via - # -r requirements.in - # alembic - # pydantic - # pydantic-core - # typing-inspect -typing-inspect==0.9.0 - # via dataclasses-json + # via -r requirements.in tzdata==2024.1 # via pandas urllib3==2.0.7 - # via - # botocore - # databricks-cli - # docker - # merlin-sdk - # requests -websocket-client==1.7.0 - # via docker -werkzeug==3.0.1 - # via flask -zipp==3.17.0 - # via importlib-metadata + # via requests # The following packages are considered to be unsafe in a requirements file: # setuptools diff --git a/python/observation-publisher/tests/common_fixtures.py b/python/observation-publisher/tests/common_fixtures.py deleted file mode 100644 index db018fb57..000000000 --- a/python/observation-publisher/tests/common_fixtures.py +++ /dev/null @@ -1,13 +0,0 @@ -import os - -import pytest - - -@pytest.fixture -def bq_project() -> str: - return os.environ.get("INTEGRATION_TEST_BQ_PROJECT") - - -@pytest.fixture -def bq_dataset() -> str: - return os.environ.get("INTEGRATION_TEST_BQ_DATASET") diff --git a/python/observation-publisher/tests/test_observation_sink.py b/python/observation-publisher/tests/test_observation_sink.py index 1bc164091..a976d87f2 100644 --- a/python/observation-publisher/tests/test_observation_sink.py +++ b/python/observation-publisher/tests/test_observation_sink.py @@ -7,17 +7,13 @@ import pytest from arize.pandas.logger import Client from dateutil import tz -from google.cloud.bigquery import Client as BigQueryClient -from google.cloud.bigquery import SchemaField from merlin.observability.inference import (BinaryClassificationOutput, InferenceSchema, RankingOutput, ValueType, ObservationType) from pandas._testing import assert_frame_equal from requests import Response -from publisher.observation_sink import (ArizeSink, BigQueryConfig, - BigQueryRetryConfig, BigQuerySink) -from tests.common_fixtures import bq_dataset, bq_project +from publisher.observation_sink import ArizeSink @pytest.fixture @@ -143,105 +139,3 @@ def test_ranking_model_arize_schema( ) df = ranking_inference_schema.preprocess(ranking_inference_logs, [ObservationType.PREDICTION]) arize_sink.write(df) - - -@pytest.mark.integration -def test_bigquery_sink_schema_migration( - bq_project: str, - bq_dataset: str, - binary_classification_inference_schema: InferenceSchema, - binary_classification_inference_logs: pd.DataFrame, -): - client = BigQueryClient() - client.delete_table( - f"{bq_project}.{bq_dataset}.prediction_log_test_project_test_model", not_found_ok=True - ) - bq_sink = BigQuerySink( - "test-project", - binary_classification_inference_schema, - "test-model", - "0.1.0", - config=BigQueryConfig( - project=bq_project, - dataset=bq_dataset, - ttl_days=14, - retry=BigQueryRetryConfig( - enabled=True, retry_attempts=3, retry_interval_seconds=10 - ), - ), - ) - bq_sink.write(binary_classification_inference_logs) - migrated_schema = dataclasses.replace(binary_classification_inference_schema) - migrated_schema.feature_types = { - "rating_v2": ValueType.FLOAT64, - } - migrated_bq_sink = BigQuerySink( - migrated_schema, - "test-model", - "0.2.0", - config=BigQueryConfig( - project=bq_project, - dataset=bq_dataset, - ttl_days=14, - retry=BigQueryRetryConfig( - enabled=True, retry_attempts=5, retry_interval_seconds=30 - ), - ), - ) - migrated_inference_logs = binary_classification_inference_logs.rename( - columns={"rating": "rating_v2"} - ) - migrated_inference_logs["model_version"] = "0.2.0" - migrated_bq_sink.write(migrated_inference_logs) - version_update_bq_sink = BigQuerySink( - migrated_schema, - "test-model", - "0.3.0", - config=BigQueryConfig( - project=bq_project, - dataset=bq_dataset, - ttl_days=14, - ), - ) - version_update_inference_logs = migrated_inference_logs.copy() - version_update_inference_logs["model_version"] = "0.3.0" - version_update_bq_sink.write(version_update_inference_logs) - - table = client.get_table(f"{bq_project}.{bq_dataset}.prediction_log_test_model") - assert table.schema == [ - SchemaField(name="session_id", field_type="STRING"), - SchemaField(name="row_id", field_type="STRING"), - SchemaField(name="request_timestamp", field_type="TIMESTAMP"), - SchemaField(name="model_version", field_type="STRING"), - SchemaField(name="rating", field_type="FLOAT"), - SchemaField(name="prediction_score", field_type="FLOAT"), - SchemaField(name="_prediction_label", field_type="STRING"), - SchemaField(name="rating_v2", field_type="FLOAT"), - ] - df = client.query( - "SELECT * FROM `{}.{}.prediction_log_test_model`".format(bq_project, bq_dataset) - ).to_dataframe() - df.reset_index(drop=True, inplace=True) - event_timestamp = datetime(2024, 1, 1, 0, 0, 0).astimezone(tz.UTC) - expected_df = pd.DataFrame.from_records( - [ - [0.8, 0.4, "1234", "a", event_timestamp, "0.1.0", "non fraud", None], - [0.5, 0.9, "1234", "b", event_timestamp, "0.1.0", "fraud", None], - [None, 0.4, "1234", "a", event_timestamp, "0.2.0", "non fraud", 0.8], - [None, 0.9, "1234", "b", event_timestamp, "0.2.0", "fraud", 0.5], - [None, 0.4, "1234", "a", event_timestamp, "0.3.0", "non fraud", 0.8], - [None, 0.9, "1234", "b", event_timestamp, "0.3.0", "fraud", 0.5], - ], - columns=[ - "rating", - "prediction_score", - "session_id", - "row_id", - "request_timestamp", - "model_version", - "_prediction_label", - "rating_v2", - ], - ) - expected_df.reset_index(drop=True, inplace=True) - assert_frame_equal(df, expected_df, check_like=True)