Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,6 @@ ovms_cc_library(
"libovms_cliparser",
"libovms_systeminfo",
"ovms_exit_codes",
"//src/utils:env_guard",
],
visibility = ["//visibility:public",],
additional_copts = COPTS_DROGON,
Expand Down Expand Up @@ -1007,6 +1006,7 @@ ovms_cc_library(
hdrs = ["logging.hpp"],
srcs = ["logging.cpp"],
deps = [
"//src/utils:env_guard",
"@com_github_gabime_spdlog//:spdlog",
"@com_github_glog_glog//:glog", # used to manage Mediapipe logging
],
Expand Down
10 changes: 0 additions & 10 deletions src/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
#include "modelconfig.hpp"
#include "stringutils.hpp"
#include "systeminfo.hpp"
#include "utils/env_guard.hpp"

namespace ovms {

Expand Down Expand Up @@ -91,15 +90,6 @@ Config& Config::parse(int argc, char** argv) {
bool Config::parse(ServerSettingsImpl* serverSettings, ModelsSettingsImpl* modelsSettings) {
this->serverSettings = *serverSettings;
this->modelsSettings = *modelsSettings;
static EnvGuard envGuard;
#if defined(__linux__) || defined(_WIN32)
if (this->serverSettings.logLevel == "DEBUG" || this->serverSettings.logLevel == "TRACE") {
envGuard.set("OPENVINO_LOG_LEVEL", "4");
}
#endif
if (GetEnvVar("OVMS_GRAPH_QUEUE_OFF").empty()) {
envGuard.set("OVMS_GRAPH_QUEUE_OFF", "1");
}
return validate();
}

Expand Down
5 changes: 5 additions & 0 deletions src/logging.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#endif
#include <vector>

#include "src/utils/env_guard.hpp"

namespace ovms {

std::shared_ptr<spdlog::logger> gcs_logger = std::make_shared<spdlog::logger>("gcs");
Expand Down Expand Up @@ -163,6 +165,9 @@ void configure_logger(const std::string& log_level, const std::string& log_path)
FLAGS_minloglevel = google::GLOG_ERROR;
#endif
#endif
if (log_level == "DEBUG" || log_level == "TRACE") {
SetEnvironmentVar("OPENVINO_LOG_LEVEL", "4");
}
}

} // namespace ovms
113 changes: 77 additions & 36 deletions src/mediapipe_internal/graphqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,59 +51,100 @@ GraphQueue::GraphQueue(const ::mediapipe::CalculatorGraphConfig& config, std::sh
std::unordered_map<std::string, std::shared_ptr<ObserverHolder>> observers;
for (auto& name : config.output_stream()) {
std::string streamName = getStreamName(name);
auto holder = std::make_shared<ObserverHolder>();
holder->current = std::make_shared<NullOutputStreamObserver>();
observers[streamName] = holder;
auto observerHolder = std::make_shared<ObserverHolder>();
observerHolder->current = std::make_shared<NullOutputStreamObserver>();
observers[streamName] = observerHolder;
}

auto graphHelper = std::make_shared<GraphHelper>(std::move(observers));
graphHelper->graph = std::make_unique<::mediapipe::CalculatorGraph>();
graphHelper->currentTimestamp = ::mediapipe::Timestamp(0);

auto absStatus = graphHelper->graph->Initialize(config);
if (!absStatus.ok()) {
SPDLOG_ERROR("Graph queue initialization failed: {}", absStatus.ToString());
throw std::runtime_error(absStatus.ToString());
}
for (const auto& [streamName, holder] : graphHelper->outStreamObservers) {
// Lambda captures holder (shared_ptr) by value — safe regardless of map layout
absStatus = graphHelper->graph->ObserveOutputStream(streamName, [holder](const ::mediapipe::Packet& packet) -> absl::Status { return holder->current->handlePacket(packet); });
if (!absStatus.ok()) {
SPDLOG_ERROR("Graph queue ObserveOutputStream failed: {}", absStatus.ToString());
throw std::runtime_error(absStatus.ToString());
}
}
for (const auto& [nodeName, _] : sidePacketMaps->genAiServableMap) {
graphHelper->genAiExecutionContextMap[nodeName] = std::make_shared<GenAiExecutionContextHolder>();
}
std::map<std::string, mediapipe::Packet> inputSidePackets;
buildInputSidePackets(inputSidePackets, *sidePacketMaps);
// Override execution context with per-graph instance
inputSidePackets[LLM_EXECUTION_CONTEXT_SESSION_SIDE_PACKET_TAG] = mediapipe::MakePacket<GenAiExecutionContextMap>(graphHelper->genAiExecutionContextMap).At(::mediapipe::Timestamp(STARTING_TIMESTAMP_VALUE));
absStatus = graphHelper->graph->StartRun(inputSidePackets);
auto absStatus = graphHelper->initialize(config, *sidePacketMaps);
if (!absStatus.ok()) {
SPDLOG_ERROR("Graph queue StartRun failed: {}", absStatus.ToString());
SPDLOG_ERROR("Graph queue initialization failed: {}", absStatus.ToString());
throw std::runtime_error(absStatus.ToString());
}
inferRequests.emplace_back(std::move(graphHelper));
this->inferRequests.emplace_back(std::move(graphHelper));
}
}
GraphQueue::~GraphQueue() {
for (auto& graphHelper : inferRequests) {
auto absStatus = graphHelper->graph->WaitUntilIdle();
GraphHelper::~GraphHelper() {
if (!graph) {
return;
}
auto absStatus = graph->WaitUntilIdle();
if (!absStatus.ok()) {
SPDLOG_DEBUG("GraphHelper WaitUntilIdle error: {}", absStatus.ToString());
}
absStatus = graph->CloseAllPacketSources();
if (!absStatus.ok()) {
SPDLOG_DEBUG("GraphHelper CloseAllPacketSources error: {}", absStatus.ToString());
}
absStatus = graph->WaitUntilDone();
if (!absStatus.ok()) {
SPDLOG_DEBUG("GraphHelper WaitUntilDone error: {}", absStatus.ToString());
}
graph->Cancel();
}

absl::Status GraphHelper::initialize(const ::mediapipe::CalculatorGraphConfig& config, const GraphSidePackets& sidePacketMaps) {
this->graph = std::make_unique<::mediapipe::CalculatorGraph>();
this->currentTimestamp = ::mediapipe::Timestamp(STARTING_TIMESTAMP_VALUE);
auto absStatus = this->graph->Initialize(config);
if (!absStatus.ok()) {
SPDLOG_ERROR("Graph initialize failed: {}", absStatus.ToString());
return absStatus;
}
for (const auto& [streamName, observerHolder] : this->outStreamObservers) {
absStatus = this->graph->ObserveOutputStream(streamName, [observerHolder](const ::mediapipe::Packet& packet) -> absl::Status {
return observerHolder->current->handlePacket(packet);
});
if (!absStatus.ok()) {
SPDLOG_DEBUG("Graph queue WaitUntilIdle error: {}", absStatus.ToString());
SPDLOG_ERROR("Graph ObserveOutputStream failed: {}", absStatus.ToString());
return absStatus;
}
absStatus = graphHelper->graph->CloseAllPacketSources();
}
std::map<std::string, mediapipe::Packet> inputSidePackets;
buildInputSidePackets(inputSidePackets, sidePacketMaps);
inputSidePackets[LLM_EXECUTION_CONTEXT_SESSION_SIDE_PACKET_TAG] =
mediapipe::MakePacket<GenAiExecutionContextMap>(this->genAiExecutionContextMap)
.At(::mediapipe::Timestamp(STARTING_TIMESTAMP_VALUE));
absStatus = this->graph->StartRun(inputSidePackets);
if (!absStatus.ok()) {
SPDLOG_ERROR("Graph StartRun failed: {}", absStatus.ToString());
return absStatus;
}
return absl::OkStatus();
}

void GraphHelper::reinitialize(const ::mediapipe::CalculatorGraphConfig& config, const GraphSidePackets& sidePacketMaps) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit test that will ensure failed graphs are reusable is missing

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test

SPDLOG_DEBUG("Reinitializing graph after error");
// Tear down the old graph (best-effort, errors expected since graph is in bad state)
if (this->graph) {
auto absStatus = this->graph->CloseAllPacketSources();
Comment on lines +120 to +124
if (!absStatus.ok()) {
SPDLOG_DEBUG("Graph queue CloseAllPacketSources error: {}", absStatus.ToString());
SPDLOG_DEBUG("reinitialize: CloseAllPacketSources: {}", absStatus.ToString());
}
absStatus = graphHelper->graph->WaitUntilDone();
absStatus = this->graph->WaitUntilDone();
if (!absStatus.ok()) {
SPDLOG_DEBUG("Graph queue WaitUntilDone error: {}", absStatus.ToString());
SPDLOG_DEBUG("reinitialize: WaitUntilDone: {}", absStatus.ToString());
}
graphHelper->graph->Cancel();
graphHelper->graph.reset();
this->graph->Cancel();
}
// Reset observers to null sentinel
for (const auto& [streamName, observerHolder] : this->outStreamObservers) {
observerHolder->current = std::make_shared<NullOutputStreamObserver>();
}
// Reset execution contexts
for (auto& [nodeName, ctx] : this->genAiExecutionContextMap) {
ctx->reset();
}
auto absStatus = initialize(config, sidePacketMaps);
if (!absStatus.ok()) {
SPDLOG_ERROR("Graph reinitialize failed: {}", absStatus.ToString());
return;
}
SPDLOG_DEBUG("Graph reinitialized successfully");
}
GraphQueue::~GraphQueue() = default;
} // namespace ovms
45 changes: 45 additions & 0 deletions src/mediapipe_internal/graphqueue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <atomic>
#include <condition_variable>
#include <exception>
#include <future>
#include <memory>
#include <mutex>
Expand All @@ -39,6 +40,8 @@
#pragma GCC diagnostic pop
#pragma warning(pop)

#include "src/logging.hpp"

#include "graph_executor_constants.hpp"
#include "graph_side_packets.hpp"
#include "outputstreamobserver.hpp"
Expand All @@ -65,7 +68,49 @@ struct GraphHelper {
genAiExecutionContextMap(std::move(gh.genAiExecutionContextMap)),
currentTimestamp(gh.currentTimestamp) {}
GraphHelper& operator=(GraphHelper&&) = delete;
~GraphHelper();
// Creates a fresh CalculatorGraph, initializes it with the config,
// wires up output stream observers, builds side packets and starts the run.
absl::Status initialize(const ::mediapipe::CalculatorGraphConfig& config, const GraphSidePackets& sidePacketMaps);
// Tears down the current (errored) graph and rebuilds a fresh one
// with the same observers and side packets. Called when inference
// encounters a graph error to avoid returning a poisoned graph to the pool.
void reinitialize(const ::mediapipe::CalculatorGraphConfig& config, const GraphSidePackets& sidePacketMaps);
};

// RAII guard that reinitializes the graph if inference exits with an error.
// Construct before the first graph interaction (packet push). Call dismiss()
// on the success path. If not dismissed, the destructor rebuilds the graph
// so the next request from the pool gets a clean graph.
class GraphReinitGuard {
GraphHelper& helper;
const ::mediapipe::CalculatorGraphConfig& config;
const GraphSidePackets& sidePacketMaps;
bool dismissed = false;

public:
GraphReinitGuard(GraphHelper& helper,
const ::mediapipe::CalculatorGraphConfig& config,
const GraphSidePackets& sidePacketMaps) :
helper(helper),
config(config),
sidePacketMaps(sidePacketMaps) {}
void dismiss() { dismissed = true; }
~GraphReinitGuard() {
if (!dismissed) {
try {
helper.reinitialize(config, sidePacketMaps);
} catch (const std::exception& e) {
SPDLOG_ERROR("GraphReinitGuard: reinitialize threw: {}", e.what());
} catch (...) {
SPDLOG_ERROR("GraphReinitGuard: reinitialize threw unknown exception");
}
}
}
Comment on lines +99 to +109
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case we could only log error anyway. Will add that.

GraphReinitGuard(const GraphReinitGuard&) = delete;
GraphReinitGuard& operator=(const GraphReinitGuard&) = delete;
};

// we need to keep Graph alive during MP reload hence shared_ptr
class GraphQueue : public Queue<std::shared_ptr<GraphHelper>> {
std::shared_ptr<GraphSidePackets> sidePacketMaps;
Expand Down
10 changes: 8 additions & 2 deletions src/mediapipe_internal/mediapipegraphexecutor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ class MediapipeGraphExecutor {
return Status(StatusCode::MEDIAPIPE_GRAPH_INITIALIZATION_ERROR,
"Input side packets are not supported for graphs with queue enabled");
}
::mediapipe::CalculatorGraph& graph = this->guard->graph;
auto llmContextStatus = initializeLlmExecutionContexts(this->sidePacketMaps.genAiServableMap, this->guard->graphHelper->genAiExecutionContextMap);
if (!llmContextStatus.ok()) {
return llmContextStatus;
Expand All @@ -199,7 +198,10 @@ class MediapipeGraphExecutor {
guard->graphHelper->outStreamObservers.at(name)->current = std::make_shared<MyFunctor<RequestType, ResponseType>>(name, this->outputTypes.at(name), *this, *request, *response);
}

GraphReinitGuard reinitOnFailureGuard(*this->guard->graphHelper, this->config, this->sidePacketMaps);

size_t numberOfPacketsCreated = 0;
::mediapipe::CalculatorGraph& graph = this->guard->graph;
auto ovms_status = createAndPushPacketsImpl(
std::shared_ptr<const RequestType>(request, [](const RequestType*) {}),
this->inputTypes,
Expand Down Expand Up @@ -227,6 +229,7 @@ class MediapipeGraphExecutor {
}
resetLlmExecutionContexts(this->guard->graphHelper->genAiExecutionContextMap);
MP_RETURN_ON_FAIL(status, "graph wait until idle", mediapipeAbslToOvmsStatus(status.code()));
reinitOnFailureGuard.dismiss();
// Increment timestamp for next request reusing this graph from the queue
this->guard->graphHelper->currentTimestamp = ::mediapipe::Timestamp(this->guard->graphHelper->currentTimestamp.Value() + 1);
SPDLOG_DEBUG("Received all output stream packets for graph: {}", this->name);
Expand Down Expand Up @@ -363,7 +366,6 @@ class MediapipeGraphExecutor {
"Input side packets are not supported for graphs with queue enabled");
}
MetricGaugeGuard currentGraphs(this->mediapipeServableMetricReporter->currentGraphs.get());
::mediapipe::CalculatorGraph& graph = this->guard->graph;
auto llmContextStatus = initializeLlmExecutionContexts(this->sidePacketMaps.genAiServableMap, this->guard->graphHelper->genAiExecutionContextMap);
if (!llmContextStatus.ok()) {
return llmContextStatus;
Expand Down Expand Up @@ -393,7 +395,10 @@ class MediapipeGraphExecutor {
executionContext, this->mediapipeServableMetricReporter);
}

GraphReinitGuard reinitOnFailureGuard(*this->guard->graphHelper, this->config, this->sidePacketMaps);

size_t numberOfPacketsCreated = 0;
::mediapipe::CalculatorGraph& graph = this->guard->graph;
{
OVMS_PROFILE_SCOPE("Mediapipe graph deserializing first request");
bool isSuccess = true;
Expand Down Expand Up @@ -450,6 +455,7 @@ class MediapipeGraphExecutor {
}
resetLlmExecutionContexts(this->guard->graphHelper->genAiExecutionContextMap);
MP_RETURN_ON_FAIL(status, "graph wait until idle", mediapipeAbslToOvmsStatus(status.code()));
reinitOnFailureGuard.dismiss();
// Increment timestamp for next request reusing this graph from the queue
this->guard->graphHelper->currentTimestamp = ::mediapipe::Timestamp(this->guard->graphHelper->currentTimestamp.Value() + 1);
SPDLOG_DEBUG("Graph {}: Done streaming execution (queue path)", this->name);
Expand Down
1 change: 1 addition & 0 deletions src/test/mediapipe/calculators/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ cc_library(
"ovms_calculator.cc",
"ovms_image_input_calculator.cc",
"ovms_kfs_calculator.cc",
"error_on_negative_test_calculator.cpp",
"streaming_test_calculator.cpp",
"two_input_calculator.cpp",
],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
//*****************************************************************************
// Copyright 2026 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//*****************************************************************************
#include <cstring>

#include <openvino/openvino.hpp>

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
#include "mediapipe/framework/calculator_framework.h"
#pragma GCC diagnostic pop

namespace mediapipe {

class ErrorOnNegativeTestCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Index(0).Set<ov::Tensor>();
cc->Outputs().Index(0).Set<ov::Tensor>();
return absl::OkStatus();
}
absl::Status Open(CalculatorContext* cc) final { return absl::OkStatus(); }
absl::Status Close(CalculatorContext* cc) final { return absl::OkStatus(); }
absl::Status Process(CalculatorContext* cc) final {
ov::Tensor input = cc->Inputs().Index(0).Get<ov::Tensor>();
if (static_cast<float*>(input.data())[0] < 0.0f) {
return absl::InvalidArgumentError("Negative input value");
}
ov::Tensor output(input.get_element_type(), input.get_shape());
std::memcpy(output.data(), input.data(), input.get_byte_size());
cc->Outputs().Index(0).Add(new ov::Tensor(output), cc->InputTimestamp());
return absl::OkStatus();
}
};

REGISTER_CALCULATOR(ErrorOnNegativeTestCalculator);
} // namespace mediapipe
Loading