HomeArchiveBlog


Original contents are licensed under CC BY-NC 4.0. All rights reserved © 2026 Kai.
Back to Archives
FPGA HLS: Scheduling Algorithms

An overview of scheduling algorithms used in FPGA High-Level Synthesis (HLS) to optimize resource utilization and performance.

Sun Jan 04 2026
Tue Jan 06 2026
FPGAHLSSchedulingOptimization
On this page
  • FPGA HLS: Scheduling Algorithms
    • 什么是调度
    • ASAP 和 ALAP
      • 代码实现
    • 列表调度
      • 代码实现
    • 力导向调度
      • 代码实现
    • SDC 调度
    • 最后

FPGA HLS: Scheduling Algorithms

在 FPGA HLS 中, 由于高层次语言几乎没有办法描述硬件级的时序行为, 而硬件设计说白了就是在组合各种器件并让它们以设定的时序行为进行工作. 因此 HLS 工具最核心的一样工作就是调度 (Scheduling), 即根据设计的行为描述, 以及用户给定的约束条件, 将每一个原子操作分配到具体的时钟周期中去执行. 如何调度直接影响了最终设计的 PPA, 本文介绍 FPGA HLS 中常见的一些调度算法.

什么是调度

再次强调, 硬件设计的本质就是在组合各种器件并让它们以设定的时序行为进行工作. 给一段简单的 C 代码 a += b;, 它会被拆成几个原子操作, 从寄存器读取 a 和 b, 加法器执行加法 (假设单周期), 将结果写回寄存器 a. 所谓原子操作指的就是不可再分的最小操作单元, 例如寄存器读写, 加法器加法等. 对于这个例子, 假设每一步都需要一个周期, 那么整条代码就会被映射到连续的 3 个时钟周期中去执行:

  • 周期 1: 读取寄存器 a 和 b
  • 周期 2: 执行加法
  • 周期 3: 将结果写回寄存器 a

分析高层的原始代码, 确定每一个原子操作所在的执行周期, 这就是调度的工作. 实际情况是非常复杂的, 广泛存在的操作依赖关系, 以及有限的硬件资源是调度需要考虑的两个主要因素.

ASAP 和 ALAP

首先是最基础的两种调度算法 ASAP (As Soon As Possible) 和 ALAP (As Late As Possible). 这两种算法都不考虑任何资源限制, 简单快速地给出一个调度结果.

ASAP 的核心思想是, 对于每一个原子操作, 都尽可能早地安排它的执行时间, 只要它所依赖的所有前驱操作都已经完成了. 例如下面的代码:

c = a + b;
d += c;

会被调度成

在这种算法下, 注意 LD d, 也就是加载 d 的操作会被直接安排到第一个周期, 因为它没有任何前驱依赖.

ALAP 则相反, 它在不影响总体调度长度的前提下, 尽可能晚地安排每一个原子操作的执行时间. 例如上面的代码会被调度成

在这种算法下, 注意 LD d, 则会被安排到第三个周期, 因为后续的 ADD d 依赖它, 在不影响总体调度长度的前提下, 它可以被放在第 1-3 周期中的任何一个周期, 但 ALAP 选择了最晚的那个.

看起来 ALAP 会更好, 因为 LD d 早开始了也没用, 反而由于要同时做三次加载操作, 可能还占用更多的储存器端口. 但实际上无论是 ALAP 和 ASAP 单独存在的实用性都是几乎没有的, 它们更多作为整个调度算法中的某一个子步骤存在. 所以讨论 ASAP 和 ALAP 的意义更多在于理解调度的基本概念和依赖关系.

代码实现

尽管如此, 我们还是给出一段用 C++ 实现 ASAP/ALAP 调度的代码示例, 它首先构建所有操作的依赖图, 然后根据依赖关系进行调度. 我们会注意到, 实际上 ALAP 的正确实现依赖于 ASAP 的结果.

我们首先定义一个基本的调度器框架, 后续实现的所有调度算法都可以基于这个框架进行扩展, 每个算法的实现都应该是非破坏性的, 即不会修改原始的依赖图结构.

enum class ResourceType {
    ADDER,
    MULTIPLIER,
    NONE
};
using RT = ResourceType;
// A helper function to get the resource type of an atomic operation
static RT get_op_resource(Operation *op) { /* ... */ }
// A helper function to get the duration of an atomic operation
static int get_op_duration(Operation *op) { /* ... */ }

struct Node {
    Operation *op;
    int duration;
    int scheduled_time = -1; // -1 means not scheduled yet
    std::vector<Node*> preds;
    std::vector<Node*> succs;
    RT resource;
    explicit Node(Operation *op) : op(op), 
                                   duration(get_op_duration(op)), 
                                   resource(get_op_resource(op)) {}
};

class Scheduler {
public:
    Scheduler(const std::vector<Operation*> &ops) {
        for (auto op : ops) {
            auto node = add_node(op);
            node_map[op] = node;
        }
        build_dependencies();
    }
    // schedule and return the total latency
    int schedule_asap();
    void schedule_alap(int max_cycles);
    void dump_schedule() const;
    void reset_schedule();

private:
    void add_edge(Node *from, Node *to) {
        from->succs.push_back(to);
        to->preds.push_back(from);
    }
    Node *add_node(Operation *op) {
        auto node = std::make_unique<Node>(op);
        Node *ptr = node.get();
        nodes.push_back(std::move(node));
        return ptr;
    }
    void build_dependencies();
    std::vector<std::unique_ptr<Node>> nodes; // vector to hold all nodes
    std::unordered_map<Operation*, Node*> node_map; // map to accelerate lookup
};

void Scheduler::build_dependencies() {
    for (const auto &n : nodes) {
        for (auto operand : n->op->getOperands()) {
            Operation *defOp = operand->getDefiningOp();
            if (defOp && node_map.contains(defOp))
                add_edge(node_map[defOp], n.get());
        }
    }
}

void Scheduler::dump_schedule() const {
    for (const auto &n : nodes)
        std::println("Operation {} scheduled at time {}", n->op->getName(), n->scheduled_time);
}

void Scheduler::reset_schedule() {
    for (auto &n : nodes)
        n->scheduled_time = -1;
}

接着是 ASAP 的实现:

int Scheduler::schedule_asap() {
    std::queue<Node*> q;
    // 初始化队列, 将所有没有前驱依赖的节点(源节点)加入队列
    for (const auto &n : nodes) {
        if (n->preds.empty()) {
            n->scheduled_time = 0;
            q.push(n.get());
        }
    }
    // 记录每个节点的入度. 我们不修改原图, 因此需要一个额外的数据结构来记录入度
    std::unordered_map<Node*, int> in_degree;
    for (const auto &n : nodes)
        in_degree[n.get()] = n->preds.size();
    
    // 开始调度
    int max_cycles = 0;
    while (!q.empty()) {
        Node *u = q.front();
        q.pop();
        // 遍历所有后继节点, 开始时间不可能早于当前节点的结束时间
        for (Node *v : u->succs) {
            int finish_time = u->scheduled_time + u->duration;
            v->scheduled_time = std::max(v->scheduled_time, finish_time);
            --in_degree[v];
            // 如果入度变为 0, 说明该节点成为新子图的源节点, 加入队列
            if (in_degree[v] == 0) {
                q.push(v);
            }
            max_cycles = std::max(max_cycles, v->scheduled_time + v->duration);
        }
    }
    return max_cycles;
}

ALAP 的实现如下:

void Scheduler::schedule_alap(int max_cycles) {
    std::unordered_map<Node*, int> out_degree;
    std::queue<Node*> q;
    // 初始化队列, 将所有没有后继依赖的节点(汇节点)加入队列
    for (const auto &n : nodes) {
        out_degree[n.get()] = n->succs.size();
        if (n->succs.empty()) {
            n->scheduled_time = max_cycles - n->duration;
            q.push(n.get());
        } else {
            n->scheduled_time = 1e9; // 初始化为一个大值
        }
    }
    // 开始调度
    while (!q.empty()) {
        Node *u = q.front();
        q.pop();
        // 遍历所有前驱节点, 结束时间不可能晚于当前节点的开始时间
        for (auto v : u->preds) {
            int start_time = u->scheduled_time - v->duration;
            v->scheduled_time = std::min(v->scheduled_time, start_time);
            if (v->scheduled_time < 0)
                throw std::runtime_error("Scheduling failed: cannot satisfy the max cycle constraint");
            --out_degree[v];
            // 如果出度变为 0, 说明该节点成为新子图的汇节点, 加入队列
            if (out_degree[v] == 0) {
                q.push(v);
            }
        }
    }
}
// Example usage
std::vector<Operation*> ops = /* ... */;
Scheduler scheduler(ops);
int cycles = scheduler.schedule_asap();
scheduler.dump_schedule();
scheduler.schedule_alap(cycles);
scheduler.dump_schedule();

可以看到 ASAP 实际上是一个对依赖图进行正序遍历的过程, 而 ALAP 算法则是一个对依赖图进行逆序遍历的过程. ALAP 实际上依赖于给定的最大延迟参数, 它靠这个参数倒推每一个节点的最晚开始时间. 因此必须给定一个正确的最大延迟参数, 才能保证 ALAP 的正确性. 注意到 ASAP 算法的返回值实际上就是找出了原图中的关键路径 (Critical Path), 给出的结果就是整个设计的最小延迟, 因此可以使用 ASAP 的结果作为 ALAP 的最大延迟参数.

这两个算法的时间复杂度都是 O(V+E), 速度非常快, 但问题在于没有考虑任何资源限制.

列表调度

列表调度 (List Scheduling) 是一种启发式调度算法, 它是基于 ASAP/ALAP 的结果进行进一步调整的. 核心思想是为每一个原子操作分配一个优先级 (通常基于 ALAP 结果), 并且维护一个就绪队列 (Ready Queue), 其中包含所有已经满足前驱依赖且等待调度的操作. 然后在每一个时钟周期中, 从就绪队列中选择优先级最高的操作进行调度, 直到资源被耗尽为止.

列表调度需要逐周期地模拟调度过程, 因此时间复杂度还会包含一个周期数因子, 大概为 O(T*(V+E)), 其中 T 是总的调度周期数. 它实际上是一种贪心算法, 每次都选择当前最优的操作进行调度, 但并不保证全局最优.

代码实现

列表调度的优先级判断可以有很多种标准, 这里我们使用 ALAP 结果作为优先级依据, 这样判断的原因是, ALAP 越晚的操作越应该被优先调度, 这样可以避免后续操作被阻塞. 具体实现上, 可以直接选择 priority_queue 来快速建立就绪队列. 我们模拟一个稍微具体一些的情景, 只考虑加法和乘法需要的额外硬件资源, 并且假设每个周期只能使用有限数量的加法器和乘法器.

我们先扩展一下之前的调度器框架, 添加一些资源限制的参数:

class Scheduler {
public:
    Scheduler(...); // update constructor as needed
    int schedule_list();
    void set_num_adders(int n) { num_adds = n; }
    void set_num_multipliers(int n) { num_mults = n; }

private:
    int num_adds;
    int num_mults;
};

然后是列表调度的实现:

int Scheduler::schedule_list() {
    // 记录每个节点的入度
    std::unordered_map<Node*, int> in_degree;
    for (const auto &n : nodes)
        in_degree[n.get()] = n->preds.size();

    auto cmp = [](Node *a, Node *b) {
        // alap must be performed before list scheduling
        assert(a->scheduled_time != -1 && b->scheduled_time != -1);
        return a->scheduled_time < b->scheduled_time; // ALAP-based priority
    };
    std::priority_queue<Node*, std::vector<Node*>, decltype(cmp)> ready_queue(cmp);

    // 初始化就绪队列
    for (const auto &n : nodes) {
        if (in_degree[n.get()] == 0)
            ready_queue.push(n.get());
    }

    // 开始调度
    int curr_time = 0;
    int scheduled_count = 0;
    int total_nodes = nodes.size();
    // 追踪多周期节点
    std::vector<Node*> active_nodes;
    while (scheduled_count < total_nodes) {
        // 检查当前周期结束的节点, 将它们从 active_nodes 中释放
        auto it = active_nodes.begin();
        while (it != active_nodes.end()) { // NOTE: 注意迭代器失效问题
            Node *n = *it;
            if (curr_time >= n->scheduled_time + n->duration) {
                for (Node *succ : n->succs) {
                    --in_degree[succ];
                    if (in_degree[succ] == 0)
                        ready_queue.push(succ);
                }
                it = active_nodes.erase(it);
            } else
                ++it;
        }
        // 计算当前周期未被占用的加法器和乘法器数量
        int avail_adds = num_adds;
        int avail_mults = num_mults;
        for (Node *n : active_nodes) {
            if (n->resource == RT::ADDER) --avail_adds;
            else if (n->resource == RT::MULTIPLIER) --avail_mults;
        }
        // 临时存储当前周期未能调度的节点
        std::vector<Node*> deferred_nodes;
        while (!ready_queue.empty()) {
            Node *n = ready_queue.top();
            ready_queue.pop();
            bool can_schedule = false;
            if (n->resource == RT::ADDER && avail_adds > 0) {
                --avail_adds;
                can_schedule = true;
            } else if (n->resource == RT::MULTIPLIER && avail_mults > 0) {
                --avail_mults;
                can_schedule = true;
            } else if (n->resource == RT::NONE) {
                // 普通操作不受资源限制
                can_schedule = true;
            }
            if (can_schedule) {
                n->scheduled_time = curr_time;
                ++scheduled_count;
                active_nodes.push_back(n);
            } else {
                // 资源不足, 延后调度
                deferred_nodes.push_back(n);
            }
        }
        // 将未能调度的节点重新加入就绪队列
        for (Node *n : deferred_nodes)
            ready_queue.push(n);
        ++curr_time;
        // 防止死循环
        if (curr_time > 10000) {
            throw std::runtime_error("Scheduling failed: too many cycles");
        }
    }
    return curr_time;
}
// Example usage
Scheduler scheduler(ops);
int cycles = scheduler.schedule_asap();
scheduler.schedule_alap(cycles);
scheduler.set_num_adders(2); // 设置加法器数量
scheduler.set_num_multipliers(1); // 设置乘法器数量
int final_cycles = scheduler.schedule_list();
scheduler.dump_schedule();

整个算法过程分为几个步骤:

  1. 初始化就绪队列, 将所有没有前驱依赖的节点加入队列.
  2. 逐周期地模拟调度过程, 在每个周期中:
    • 检查当前周期结束的节点, 释放它们的后继节点
    • 考虑仍未结束的多周期节点 (它们仍然占用资源), 计算当前周期可用的资源数量
    • 从就绪队列中选择优先级最高的节点进行调度, 直到资源被耗尽为止
    • 更新周期计数器, 进入下一个周期
  3. 重复上述过程, 直到所有节点都被调度完成.

这里面假设所有的 primitives, 也就是加法器和乘法器都是 blocking (阻塞的), 一旦开始执行, 就会一直占用资源直到完成 (和流水线化的资源相反). 这种假设相对简化了调度过程, 不过在使用 List Scheduling 的场景下做出这个假设是比较常见的.

力导向调度

列表调度是在给定资源限制的情况下, 尝试最小化总体调度长度的一种贪心算法. 另一种算法力导向调度 (Force-Directed Scheduling, FDS) 则是在给定时间约束的情况下, 尝试平滑资源使用率的一种算法, 也就是尝试将资源使用情况尽可能平铺在所有周期中, 以避免某些周期资源过载而某些周期资源闲置的情况.

FDS 参考了物理里面 "弹力" 的概念, 核心思想是如果多个操作在同一个周期内竞争同一资源, 那么它们之间会产生一种 "排斥力", 促使它们分散到不同的周期中去执行. 具体的实现过程上, 分为以下几步:

  1. 首先跑一遍 ASAP 和 ALAP, 得到每一个操作的最早和最晚执行时间, 这称为每个操作的时间框 (Time Frame).
  2. 根据时间框, 计算每个周期内的"分布密度图" (Distribution Graph), 例如一个操作如果可以在周期 2-4 之间执行, 那么它会对周期 2, 3, 4 都贡献 1/3 的概率密度.
  3. 计算如果将某个操作调度到某个周期, 会对整体资源使用情况产生多大的影响, 这就是所谓的 "力" (Force).
  4. 选择力最小的操作和周期进行调度, 并更新分布密度图.
  5. 重复上述过程, 直到所有操作都被调度完成.

关键在于怎么计算 DG 和力. DG 的计算比较简单, 对于已经被调度的操作, 它会在其执行的所有周期内各贡献 1; 对于未调度的操作, 它会在其时间框内的每个周期贡献相应的概率.

力的计算则分为两部分: 自力 (Self Force) 和间接力 (Indirect Force). 自力指的是将某个操作调度到某个周期时, 对从这个周期开始直到其持续时间结束的所有周期内的 DG 产生的影响之和. 间接力则指由于将某个操作调度到某个周期, 导致其前驱和后继操作的时间框发生变化, 进而影响它们的 DG 所产生的影响之和.

这两种力的计算都涉及到计算平均力 (Average Force). 给定一个时间框, 平均力指的是在这个时间框内, 操作从不同周期开始执行时, 对 DG 产生的平均影响. 计算公式为:

Favg(i,T1,T2)=1T2−T1+1∑t=T1T2∑d=tt+Di−1DG(d)F_\text{avg}(i, T_1, T_2) = \frac{1}{T_2 - T_1 + 1} \sum_{t=T_1}^{T_2} \sum_{d=t}^{t+D_i-1} DG(d) Favg​(i,T1​,T2​)=T2​−T1​+11​t=T1​∑T2​​d=t∑t+Di​−1​DG(d)

其中 iii 是操作对应的下标, DiD_iDi​ 是这个操作的持续时间. 借助这个公式, 自力的计算公式就可以表示为

Fself(i,T)=∑t=TT+Di−1DG(t)−Favg(ASAPi,ALAPi)F_\text{self}(i, T) = \sum_{t=T}^{T+D_i-1} DG(t) - F_\text{avg}(\text{ASAP}_i, \text{ALAP}_i)Fself​(i,T)=t=T∑T+Di​−1​DG(t)−Favg​(ASAPi​,ALAPi​)

其中 TTT 是操作 iii 被调度到的周期, ASAPi\text{ASAP}_iASAPi​ 和 ALAPi\text{ALAP}_iALAPi​ 分别是操作 iii 的最早和最晚执行时间.

计算间接力则会稍微复杂一些, 需要先进行约束传播, 也就是模拟将操作 iii 调度到周期 TTT 后, 它的前驱和后继操作的时间框会发生怎样的变化. 这实际上是一个不动点迭代的过程 (不动点包括操作自身和已经被调度的操作), 直到所有操作的时间框都不再变化为止. 然后对于每一个受影响的操作 jjj, 计算它的新平均力和旧平均力之差, 并将这些差值累加起来, 就得到了间接力:

Find(i,T)=∑j∈affected(Favg(j,ASAPj′,ALAPj′)−Favg(j,ASAPj,ALAPj))F_\text{ind}(i, T) = \sum_{j \in \text{affected}} \left( F_\text{avg}(j, \text{ASAP}_j', \text{ALAP}_j') - F_\text{avg}(j, \text{ASAP}_j, \text{ALAP}_j) \right)Find​(i,T)=j∈affected∑​(Favg​(j,ASAPj′​,ALAPj′​)−Favg​(j,ASAPj​,ALAPj​))

最终我们认为总力等于自力和间接力之和, 也就是将操作 iii 调度到周期 TTT 时的总影响:

Ftotal(i,T)=Fself(i,T)+Find(i,T)F_\text{total}(i, T) = F_\text{self}(i, T) + F_\text{ind}(i, T)Ftotal​(i,T)=Fself​(i,T)+Find​(i,T)

对每一个操作, 计算它在其时间框内每个周期的总力, 选择总力最小的操作和周期进行调度. 重复上述过程, 直到所有操作都被调度完成.

最后是一个细节, 对于不考虑资源限制的操作 (也就是假设无限资源), 它们对 DG 的贡献是零, 也就是它们不参与 DG 计算. 同样在约束传播后计算间接力的时候, 它们对间接力的贡献也是零. 但它们必须参与约束传播, 因为它们的调度时间会影响那些占用资源的操作的时间框.

通常在有多种资源类型的情况下, 我们会为每一种资源类型分别计算 DG 和力, 然后将它们加权考虑. 这里为了简化, 我们把加法器和乘法器看成同一种资源类型 (例如都使用 DSP 计算), 只计算一种 DG 和力.

代码实现

为了实现力导向调度, 我们首先需要扩展调度器框架, 添加分布密度图和力计算的相关方法:

struct TimeFrame {
    int asap;
    int alap;
};

class Scheduler {
public:
    Scheduler(...); // update constructor as needed
    void schedule_fds(int max_cycles);
private:
    void compute_time_frames(std::unordered_map<Node*, TimeFrame> &frames);
    std::vector<int> dsp_graph; // distribution graph for DSP resources
};

static void compute_distribution_graph(int cycles, 
                                       std::unordered_map<Node*, TimeFrame> &frames, 
                                       std::vector<double> &dg);
static double compute_avg_force(Node *node, int asap, int alap, const std::vector<double> &dg);
static double compute_self_force(Node *node, const TimeFrame &tf, int time_slot, const std::vector<double> &dg);
static void propagate_time_frames(std::unordered_map<Node*, TimeFrame> &frames, Node *node);
static double compute_indirect_force(Node *node, const std::unordered_map<Node*, TimeFrame> &frames, 
                              int time_slot, const std::vector<double> &dg);

然后是力导向调度的实现. 我们首先实现计算时间框的函数, 它需要先进行 ASAP 计算, 然后进行 ALAP 计算. 由于前面的 ASAP, ALAP 会修改节点的调度时间, 因此我们需要使用一个额外的数据结构来存储时间框信息. 并且 FDS 在每个调度迭代中都要重新计算时间框, 并且要考虑已经被调度的节点. 总之复用上面的 ASAP/ALAP 实现是不合适的. 我们在 compute_time_frames 中重新实现 ASAP 和 ALAP 的计算逻辑:

void Scheduler::compute_time_frames(std::unordered_map<Node*, TimeFrame> &frames) {
    frames.clear();
    // perform asap first
    std::queue<Node*> q;
    for (const auto &n : nodes) {
        Node *pn = n.get();
        if (n->scheduled_time != -1) { // 固定已调度节点
            frames[pn] = {n->scheduled_time, n->scheduled_time};
            q.push(pn);
        } else if (n->preds.empty()) {
            frames[pn] = {0, 1e9}; // 源节点 ASAP = 0
            q.push(pn);
        } else {
            frames[pn] = {0, 1e9}; // 初始化时间框
        }
    }
    // ASAP 计算
    int max_cycles = 0;
    while (!q.empty()) {
        Node *u = q.front();
        q.pop();
        for (auto v : u->succs) {
            if (v->scheduled_time != -1)
                continue; // 已经调度的节点不变
            int new_asap = frames[u].asap + u->duration;
            if (new_asap > frames[v].asap) {
                frames[v].asap = new_asap;
                q.push(v);
            }
            max_cycles = std::max(max_cycles, frames[v].asap + v->duration);
        }
    }
    // ALAP 计算
    std::queue<Node*> rq;
    for (const auto &n : nodes) {
        Node *pn = n.get();
        if (n->scheduled_time != -1) { // 固定已调度节点
            rq.push(pn);
        } else if (n->succs.empty()) {
            frames[pn].alap = max_cycles - n->duration; // 汇节点 ALAP = max_cycles - duration
            rq.push(pn);
        } else {
            frames[pn].alap = max_cycles; // 初始化 ALAP
        }
    }
    while (!rq.empty()) {
        Node *u = rq.front();
        rq.pop();
        for (auto v : u->preds) {
            if (v->scheduled_time != -1)
                continue; // 已经调度的节点不变
            int new_alap = frames[u].alap - v->duration;
            if (new_alap < frames[v].alap) {
                frames[v].alap = new_alap;
                rq.push(v);
            }
            if (frames[v].alap < frames[v].asap)
                throw std::runtime_error("Scheduling failed: no feasible time frame");
        }
    }
}

接着是计算分布密度图的函数, 完全按照上面的描述实现即可:

void compute_distribution_graph(int cycles, std::unordered_map<Node*, TimeFrame> &frames, std::vector<double> &dg) {
    dg.resize(cycles, 0.0);
    for (const auto &[n, tf] : frames) {
        if (n->resource == RT::NONE)
            continue; // 只考虑受资源限制的操作
        if (n->scheduled_time != -1) {
            // 已经被调度的节点, 在其持续的所有周期内各贡献 1
            for (int t = n->scheduled_time; t < n->scheduled_time + n->duration; ++t) {
                if (t < cycles)
                    dg[t] += 1.0;
            }
            continue;
        }
        // 计算时间框内的概率分布
        int length = tf.alap - tf.asap + 1;
        double prob = 1.0 / length;
        // 在时间框内的每个周期贡献相应的概率
        for (int t = tf.asap; t <= tf.alap; ++t) {
            if (t < cycles)
                dg[t] += prob;
        }
    }
    return dg;
}

接着是计算各种力的辅助函数:

double compute_avg_force(Node *node, int asap, int alap, 
                                   const std::vector<double> &dg) {
    int length = alap - asap + 1;
    double total_force = 0.0;
    // 计算节点在特定时间框内的平均力贡献
    for (int t = asap; t <= alap; ++t) {
        for (int d = t; d < t + node->duration; ++d) {
            if (d < dg.size())
                total_force += dg[d];
        }
    }
    return total_force / length;
}

double compute_self_force(Node *node, const TimeFrame &tf, 
                                    int time_slot, const std::vector<double> &dg) {
    // 计算将节点调度到 time_slot 时的力
    double force = 0.0;
    for (int d = time_slot; d < time_slot + node->duration; ++d) {
        if (d < dg.size())
            force += dg[d];
    }
    double avg_force = compute_avg_force(node, tf.asap, tf.alap, dg);
    return force - avg_force;
}

void propagate_time_frames(std::unordered_map<Node*, TimeFrame> &frames, Node *node) {
    // 约束传播函数, 更新前驱和后继节点的时间框
    std::queue<Node*> q;
    q.push(node);
    while (!q.empty()) {
        Node *n = q.front();
        q.pop();
        TimeFrame &tf = frames[n];
        // 更新后继节点的 ASAP
        for (auto succ : n->succs) {
            if (succ->scheduled_time != -1)
                continue; // 已经调度的节点不变
            int new_asap = tf.asap + n->duration;
            if (new_asap > frames[succ].asap) {
                frames[succ].asap = new_asap;
                q.push(succ);
            }
        }
        // 更新前驱节点的 ALAP
        for (auto pred : n->preds) {
            if (pred->scheduled_time != -1)
                continue; // 已经调度的节点不变
            int new_alap = tf.alap - pred->duration;
            if (new_alap < frames[pred].alap) {
                frames[pred].alap = new_alap;
                q.push(pred);
            }
        }
    }
}

double compute_indirect_force(Node *node, const std::unordered_map<Node*, TimeFrame> &frames, 
                                        int time_slot, const std::vector<double> &dg) {
    double indirect_force = 0.0;
    // 创建一个副本来模拟调度
    std::unordered_map<Node*, TimeFrame> temp_frames = frames;
    temp_frames[node] = {time_slot, time_slot}; // 模拟调度到 time_slot
    // 约束传播
    propagate_time_frames(temp_frames, node);
    // 计算间接力
    for (const auto &[n, tf] : temp_frames) {
        // 节点自身和已经调度的节点不计算间接力
        if (n == node || n->scheduled_time != -1)
            continue;
        if (n->resource == RT::NONE)
            continue; // 只考虑受资源限制的操作
        if (temp_frames[n].asap != frames[n].asap || temp_frames[n].alap != frames[n].alap) {
            if (temp_frames[n].asap > temp_frames[n].alap)
                return 1e12; // 不可调度, 返回一个大值
            // 计算新旧平均力之差
            double old_avg = compute_avg_force(n, frames[n].asap, frames[n].alap, dg);
            double new_avg = compute_avg_force(n, temp_frames[n].asap, temp_frames[n].alap, dg);
            indirect_force += (new_avg - old_avg);
        }
    }
    return indirect_force;
}

最后是调度器的主调度函数, 它会反复计算时间框和 DG, 并选择力最小的节点进行调度:

void Scheduler::schedule_fds(int max_cycles) {
    dsp_graph.resize(max_cycles, 0);
    std::unordered_map<Node*, TimeFrame> frames;
    std::vector<double> dg(max_cycles);

    int scheduled_count = 0;
    int total_nodes = nodes.size();
    while (scheduled_count < total_nodes) {
        compute_time_frames(frames);
        compute_distribution_graph(max_cycles, frames, dg);

        double min_force = 1e9; // 大数但小于 compute_avg_force 遇到非法调度时返回的值
        Node *best_node = nullptr;
        int best_time_slot = -1;

        for (const auto &n : nodes) {
            Node *pn = n.get();
            if (n->scheduled_time != -1)
                continue; // 已经调度的节点跳过
            TimeFrame &tf = frames[pn];
            for (int t = tf.asap; t <= tf.alap; ++t) {
                double self_force = compute_self_force(pn, tf, t, dg);
                double indirect_force = compute_indirect_force(pn, frames, t, dg);
                double total_force = self_force + indirect_force;
                if (total_force < min_force) {
                    min_force = total_force;
                    best_node = pn;
                    best_time_slot = t;
                }
            }
        }
        // 调度最佳节点
        if (best_node) {
            best_node->scheduled_time = best_time_slot;
            ++scheduled_count;
        } else {
            throw std::runtime_error("Scheduling failed: cannot satisfy the max cycle constraint");
        }
    }
    // 如果成功调度, dg 储存的就是最终的资源使用图
    dsp_graph.clear();
    for (int i = 0; i < max_cycles; ++i) {
        assert((dg[i] - std::round(dg[i])) < 1e-6); // 应该是整数
        dsp_graph.push_back(static_cast<int>(dg[i]));
    }
}
// Example usage
Scheduler scheduler(ops);
int cycles = scheduler.schedule_asap();
double slack = 1.2; // 允许的延迟松弛比例
int max_cycles = static_cast<int>(cycles * slack);
scheduler.schedule_fds(max_cycles);
scheduler.dump_schedule();

和 ALAP 一样, FDS 也依赖一个时间约束, 也就是最大允许的调度周期数. 这个参数可以基于 ASAP 的结果加上一定 slack 来确定. ASAP 的结果给出的是无限资源下的最小调度长度, 直接把这个结果作为 FDS 的时间约束容易导致资源爆炸, 因此一个解决方法是允许一定比例的延迟松弛 (slack), 例如允许调度长度增加 20%, 这样允许 FDS 把资源平铺开来使用. 另一种方法是探索 Pareto 前沿, 画出一条资源随调度长度变化的曲线, 选择一个合适的折中点进行调度.

SDC 调度

SDC (System Dependency Constraint) 调度是目前商用级 FPGA 工具中 (Vitis HLS, Catapult HLS 等) 最常用的调度算法. 它将调度问题建模成一个线性规划 (Linear Programming, LP) 问题, 通过求解这个 LP 问题来得到最终的调度结果. 由于 LP 问题是多项式时间可解的, 因此 SDC 调度在理论上是高效且可行的.

SDC 算法的建模过程实际上非常暴力和直接, 但涉及到的数学知识就比较复杂了. 因为直接暴力建模出来的是一个 ILP 问题, 它是 NP-hard 的, 需要额外的技巧将其转化为一个 LP 问题. 这里我们不展开讨论 SDC 的数学细节, 只给出建模的过程.

  1. 对于每一个操作 iii, 定义一个非负整数变量 xix_ixi​, 表示操作 iii 的开始时间 (以周期为单位).
  2. 如果操作 iii 依赖于操作 jjj, 那么添加一个约束: xi≥xj+Djx_i \geq x_j + D_jxi​≥xj​+Dj​ 其中 DjD_jDj​ 是操作 jjj 的持续时间. 这是数据依赖约束. 例如, 如果 jjj 是一个纯组合逻辑, 那 Dj=0D_j=0Dj​=0, 如果 jjj 是一个加法, 可能 Dj=1D_j=1Dj​=1 (一个周期完成).
  3. 给定一个时钟频率, 和对应的周期时间长度 TclkT_\text{clk}Tclk​, 那一条路径上的组合延迟之和不能超过 TclkT_\text{clk}Tclk​. 假设有一条组合路径 i→j→⋯→ki\rightarrow j \rightarrow \dots \rightarrow ki→j→⋯→k, 总延迟为 C(i,k)C(i, k)C(i,k), 那么如果 C(i,k)>TclkC(i, k) > T_\text{clk}C(i,k)>Tclk​, 那么 iii 和 kkk 绝对不能在同一个周期内执行, 因此添加一个约束: xk−xi≥⌈C(i,k)+TsetupTclk⌉−1x_k - x_i \geq \left\lceil \frac{C(i, k) + T_\text{setup}}{T_\text{clk}} \right\rceil - 1 xk​−xi​≥⌈Tclk​C(i,k)+Tsetup​​⌉−1 其中 TsetupT_\text{setup}Tsetup​ 是寄存器的建立时间 (Setup Time). 这是时序约束. 换句话说, 只要路径延迟超过一个时钟周期, 那起点和终点的时间变量之间就必须至少相差一个周期. 这个约束可以通过静态时序分析 (STA) 工具来获得.
  4. 在流水化循环中, 如果给定了一个目标 II (Initiation Interval, II), 那对于同语句的不同迭代, 也需要添加类似的约束: xi(k+1)−xi(k)≥IIx_i^{(k+1)} - x_i^{(k)} \geq IIxi(k+1)​−xi(k)​≥II 其中 xi(k)x_i^{(k)}xi(k)​ 表示第 kkk 次迭代中操作 iii 的开始时间. 对于全局的总周期数, 需要通过 CDFG 找到所有的起点和终点, 并添加约束: xend−xstart≤Xx_\text{end} - x_\text{start} \leq Xxend​−xstart​≤X 其中 XXX 代表全局总周期数, 它应该大于所有全局起点-终点对的路径上的周期数. 这些约束统称为控制流约束.
  5. 最终的目标函数即为最小化总周期数: minimize⁡X\operatorname{minimize} XminimizeX

这里面没有显式地建模资源约束, 这是故意的. 观察以上的所有约束条件, 如果将它们写成矩阵乘法的形式, 即 Ax≤bAx \leq bAx≤b, 其中 xxx 是所有变量的向量, 那么矩阵 AAA 实际上是一个全 TU 矩阵 (Totally Unimodular Matrix), 也就是说 AAA 每行都至多有一个 1 和一个 -1, 其余元素都是 0. 在这种情况下, 尽管原始问题是一个 ILP 问题, 但它的 LP 松弛 (也就是将整数变量放宽为实数变量) 的解恰好也是整数解. 因此我们可以直接求解 LP 问题, 而不需要使用复杂的 ILP 求解器.

但如果加上资源约束, 那就纯纯是个 ILP 问题了, 因为资源约束会破坏 TU 矩阵的性质. 例如, 假设某种资源总量为 NRN_RNR​ 那么我们要这样建立约束: 为每一个操作 iii 添加一系列二进制变量 yi,ty_{i,t}yi,t​, 表示操作 iii 是否在周期 ttt 执行. 然后添加约束:

xi=∑tt⋅yi,tx_i = \sum_{t} t \cdot y_{i,t}xi​=t∑​t⋅yi,t​ ∑tyi,t=1\sum_{t} y_{i,t} = 1t∑​yi,t​=1

这表示每个操作不可能被多次开始. 然后添加资源约束:

∑i∈Ops∑k=0Di−1yi,t−k≤NR\sum_{i \in \text{Ops}} \sum_{k=0}^{D_i-1} y_{i,t-k} \leq N_Ri∈Ops∑​k=0∑Di​−1​yi,t−k​≤NR​

其中内层求和表示如果操作 iii 在周期 t−kt-kt−k 开始执行, 那么它会在周期 ttt 占用资源 (假设操作持续时间为 DiD_iDi​). 整个约束对所有操作求和, 也就表达了在周期 ttt 资源总量不能超过 NRN_RNR​. 暂且不提这个约束会引入一大堆辅助变量, 它直接破坏了 TU 矩阵的性质, 因为对于 xix_ixi​ 和 yi,ty_{i,t}yi,t​ 之间的关系 (第一条约束), ttt 显然可以取任意非负整数, 那系数矩阵里面直接会出现非常多大于 1 的元素.

那直接将资源约束加入原始建模就显得不切实际了, 实际过程中使用的方法是增量式SDC (Incremental SDC). 它的迭代过程如下:

  1. 首先只使用数据依赖约束, 时序约束和控制流约束建立 LP 问题, 求解得到一个初始调度结果.
  2. 检查资源是否存在超额使用的情况, 如果没有, 则调度完成.
  3. 如果存在资源超额使用的情况, 利用启发式算法给发生冲突的操作排一个优先级, 选出优先级最高的且满足资源数量限制的操作, 踢掉剩余的操作.
  4. 为被踢掉的操作添加新的差分约束, 形如 xj−xi≥Dix_j - x_i \geq D_ixj​−xi​≥Di​, 强制它们在不同周期执行. 这样仍然维持了 TU 矩阵的性质.
  5. 将这个约束加入 LP 问题的约束集中, 重新求解 LP 问题.
  6. 重复上述过程, 直到没有资源冲突为止, 或者达到最大迭代次数.

这就是最工业化的调度算法的基本思路了. 它能综合考量时序约束 (这在前面几个算法中是没有做到的) 和资源限制, 并且用 LP 求解器保证找到的解一定是全局最优解 (在给定约束下). 然后通过迭代式添加资源约束, 保证实际情况下的资源限制也能被满足. 显然这是一种最 robust 的调度算法, 多项式时间内可解且解质量非常高.

关于 SDC 调度的实现方法, 单纯建模还算比较简单 (除了 STA 不太好弄), 但由于涉及到 LP 求解器的使用, 例如 Gurobi, CPLEX 等等, 工程细节就比较麻烦了, 这里不做展示.

最后

实际上还有很多其他的调度算法, 比如纯 ILP 的暴力求解 (适用于极小规模的问题), 但由于适用范围有限, 这里就不展开讨论了. 关于流水线调度还有一个非常常用的算法, 即模调度 (Modulo Scheduling), 将会在 FPGA HLS: Pipelining 一章中进行介绍.