Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion core/threaded/scheduler_NP.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ typedef struct custom_scheduler_data_t {
reaction_t** executing_reactions;
lf_mutex_t* array_of_mutexes;
reaction_t*** triggered_reactions;
size_t* triggered_reactions_sizes; // Store queue sizes for bounds checking
volatile size_t next_reaction_level;
lf_semaphore_t* semaphore; // Signal the maximum number of worker threads that should
// be executing work at the same time. Initially 0.
Expand Down Expand Up @@ -76,8 +77,17 @@ static inline void _lf_sched_insert_reaction(lf_scheduler_t* scheduler, reaction
scheduler->indexes[reaction_level] = 0;
}
#endif
LF_ASSERT(reaction_level <= scheduler->max_reaction_level, "Reaction level %zu exceeds max %zu", reaction_level,
scheduler->max_reaction_level);

int reaction_q_level_index = lf_atomic_fetch_add((int*)&scheduler->indexes[reaction_level], 1);
assert(reaction_q_level_index >= 0);

LF_ASSERT(reaction_q_level_index >= 0, "Negative queue index %d at level %zu", reaction_q_level_index,
reaction_level);
LF_ASSERT((size_t)reaction_q_level_index < scheduler->custom_data->triggered_reactions_sizes[reaction_level],
"Queue overflow: index %d >= size %zu at level %zu", reaction_q_level_index,
scheduler->custom_data->triggered_reactions_sizes[reaction_level], reaction_level);

LF_PRINT_DEBUG("Scheduler: Accessing triggered reactions at the level %zu with index %d.", reaction_level,
reaction_q_level_index);
((reaction_t***)scheduler->custom_data->triggered_reactions)[reaction_level][reaction_q_level_index] = reaction;
Expand Down Expand Up @@ -254,6 +264,9 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t*
env->scheduler->custom_data->triggered_reactions =
(reaction_t***)calloc((env->scheduler->max_reaction_level + 1), sizeof(reaction_t**));

env->scheduler->custom_data->triggered_reactions_sizes =
(size_t*)calloc((env->scheduler->max_reaction_level + 1), sizeof(size_t));

env->scheduler->custom_data->array_of_mutexes =
(lf_mutex_t*)calloc((env->scheduler->max_reaction_level + 1), sizeof(lf_mutex_t));

Expand All @@ -272,6 +285,7 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t*
}
// Initialize the reaction vectors
env->scheduler->custom_data->triggered_reactions[i] = (reaction_t**)calloc(queue_size, sizeof(reaction_t*));
env->scheduler->custom_data->triggered_reactions_sizes[i] = queue_size;

LF_PRINT_DEBUG("Scheduler: Initialized vector of reactions for level %zu with size %zu", i, queue_size);

Expand All @@ -294,6 +308,7 @@ void lf_sched_free(lf_scheduler_t* scheduler) {
}
free(scheduler->custom_data->triggered_reactions);
}
free(scheduler->custom_data->triggered_reactions_sizes);
free(scheduler->custom_data->array_of_mutexes);
lf_semaphore_destroy(scheduler->custom_data->semaphore);
free(scheduler->custom_data);
Expand Down
Loading