c语言实现WFQ算法,将这个项目编程三个例程,“发送者”、“接收者”和“路由器”。使用FlowID=1、重量=1和数据包大小1024启动发送人。几秒钟后,使用FlowID=2、权重=1和数据包大小512启动另一个发送器。再过几秒钟,使用FlowID=3、权重=2和数据包大小1024启动第三个发送方。
时间: 2024-03-14 22:47:25 浏览: 91
好的,以下是基于C语言实现WFQ算法的示例代码,供您参考:
发送者:
```c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <time.h>
#define DEST_IP "127.0.0.1"
#define DEST_PORT 8888
#define FLOW_ID1 1
#define WEIGHT1 1
#define FLOW_ID2 2
#define WEIGHT2 1
#define FLOW_ID3 3
#define WEIGHT3 2
int main()
{
int sockfd;
struct sockaddr_in dest_addr;
char send_buf[1024];
int packet_size = 1024;
int flow_id = FLOW_ID1;
int weight = WEIGHT1;
sockfd = socket(AF_INET, SOCK_DGRAM, 0);
memset(&dest_addr, 0, sizeof(dest_addr));
dest_addr.sin_family = AF_INET;
dest_addr.sin_addr.s_addr = inet_addr(DEST_IP);
dest_addr.sin_port = htons(DEST_PORT);
while (1) {
memset(send_buf, '0', packet_size);
sprintf(send_buf, "%d:", flow_id);
sendto(sockfd, send_buf, strlen(send_buf), 0, (struct sockaddr *)&dest_addr, sizeof(dest_addr));
usleep(1000/weight);
}
close(sockfd);
return 0;
}
```
接收者:
```c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#define LOCAL_IP "127.0.0.1"
#define LOCAL_PORT 8888
int main()
{
int sockfd;
struct sockaddr_in local_addr;
char recv_buf[1024];
sockfd = socket(AF_INET, SOCK_DGRAM, 0);
memset(&local_addr, 0, sizeof(local_addr));
local_addr.sin_family = AF_INET;
local_addr.sin_addr.s_addr = inet_addr(LOCAL_IP);
local_addr.sin_port = htons(LOCAL_PORT);
bind(sockfd, (struct sockaddr *)&local_addr, sizeof(local_addr));
while (1) {
memset(recv_buf, 0, sizeof(recv_buf));
recvfrom(sockfd, recv_buf, sizeof(recv_buf), 0, NULL, NULL);
printf("%s\n", recv_buf);
}
close(sockfd);
return 0;
}
```
路由器:
```c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <semaphore.h>
#define LOCAL_IP "127.0.0.1"
#define LOCAL_PORT 8888
#define QUEUE_SIZE 100
#define FLOW_ID1 1
#define WEIGHT1 1
#define FLOW_ID2 2
#define WEIGHT2 1
#define FLOW_ID3 3
#define WEIGHT3 2
// 定义数据包结构体
typedef struct _packet {
float priority; // 优先级,根据权重计算得出
char data[1024]; // 数据
} packet_t;
// 定义队列结构体
typedef struct _queue {
packet_t *data; // 数据
int head; // 队头
int tail; // 队尾
sem_t sem_mutex; // 互斥信号量
sem_t sem_space; // 空间信号量
sem_t sem_item; // 项目信号量
} queue_t;
queue_t g_queue; // 全局队列
// 定义发送者线程函数
void *send_thread(void *arg)
{
int sockfd;
struct sockaddr_in dest_addr;
char send_buf[1024];
int packet_size = *(int *)arg;
int flow_id;
int weight;
if (packet_size == 1024) {
flow_id = FLOW_ID1;
weight = WEIGHT1;
} else if (packet_size == 512) {
flow_id = FLOW_ID2;
weight = WEIGHT2;
} else {
flow_id = FLOW_ID3;
weight = WEIGHT3;
}
sockfd = socket(AF_INET, SOCK_DGRAM, 0);
memset(&dest_addr, 0, sizeof(dest_addr));
dest_addr.sin_family = AF_INET;
dest_addr.sin_addr.s_addr = inet_addr(DEST_IP);
dest_addr.sin_port = htons(DEST_PORT);
while (1) {
memset(send_buf, '0', packet_size);
sprintf(send_buf, "%d:", flow_id);
sendto(sockfd, send_buf, strlen(send_buf), 0, (struct sockaddr *)&dest_addr, sizeof(dest_addr));
usleep(1000/weight);
}
close(sockfd);
return NULL;
}
// 定义处理数据包的函数
void process_packet(char *data)
{
int flow_id;
char *payload;
flow_id = atoi(strtok(data, ":"));
payload = strtok(NULL, ":");
// 处理数据包的逻辑
// ...
}
// 定义处理队列的线程函数
void *process_queue(void *arg)
{
packet_t packet;
while (1) {
sem_wait(&g_queue.sem_item);
sem_wait(&g_queue.sem_mutex);
memcpy(&packet, &g_queue.data[g_queue.head], sizeof(packet_t));
g_queue.head = (g_queue.head + 1) % QUEUE_SIZE;
sem_post(&g_queue.sem_mutex);
sem_post(&g_queue.sem_space);
process_packet(packet.data);
}
return NULL;
}
int main()
{
int sockfd;
struct sockaddr_in local_addr;
char recv_buf[1024];
pthread_t send_thread1, send_thread2, send_thread3, queue_thread;
packet_t packet;
int packet_size;
// 创建UDP套接字
sockfd = socket(AF_INET, SOCK_DGRAM, 0);
// 绑定IP地址和端口号
memset(&local_addr, 0, sizeof(local_addr));
local_addr.sin_family = AF_INET;
local_addr.sin_addr.s_addr = inet_addr(LOCAL_IP);
local_addr.sin_port = htons(LOCAL_PORT);
bind(sockfd, (struct sockaddr *)&local_addr, sizeof(local_addr));
// 初始化队列
g_queue.data = (packet_t *)malloc(sizeof(packet_t) * QUEUE_SIZE);
g_queue.head = 0;
g_queue.tail = 0;
sem_init(&g_queue.sem_mutex, 0, 1);
sem_init(&g_queue.sem_space, 0, QUEUE_SIZE);
sem_init(&g_queue.sem_item, 0, 0);
// 创建发送者线程1
packet_size = 1024;
pthread_create(&send_thread1, NULL, send_thread, &packet_size);
// 等待一段时间再启动发送者线程2
sleep(3);
packet_size = 512;
pthread_create(&send_thread2, NULL, send_thread, &packet_size);
// 再等待一段时间再启动发送者线程3
sleep(3);
packet_size = 1024;
pthread_create(&send_thread3, NULL, send_thread, &packet_size);
// 创建处理队列的线程
pthread_create(&queue_thread, NULL, process_queue, NULL);
// 接收数据包并加入队列
while (1) {
memset(recv_buf, 0, sizeof(recv_buf));
recvfrom(sockfd, recv_buf, sizeof(recv_buf), 0, NULL, NULL);
packet.priority = 1.0 / atoi(strtok(recv_buf, ":"));
strcpy(packet.data, recv_buf);
sem_wait(&g_queue.sem_space);
sem_wait(&g_queue.sem_mutex);
memcpy(&g_queue.data[g_queue.tail], &packet, sizeof(packet_t));
g_queue.tail = (g_queue.tail + 1) % QUEUE_SIZE;
sem_post(&g_queue.sem_mutex);
sem_post(&g_queue.sem_item);
}
// 销毁队列
free(g_queue.data);
close(sockfd);
return 0;
}
```
需要注意的是,以上代码仅供参考,具体实现可能需要根据实际情况进行修改和优化。
阅读全文