外观
04.应用层协议设计与实现、应用层协议解析模块
约 3770 字大约 13 分钟
嵌入式数据结构NETLinux
2022-06-14
8、应用层协议设计与实现
① 问题:下面的代码输出什么?为什么

Receive: ABC
② 小知识
发送缓冲区
- 数据先进入发送缓冲区,之后由操作系统送往远端主机
接收缓冲区
- 远端数据被操作系统接收后放入接收缓冲区
- 之后应用程序从接收缓冲区读取数据

③ TCP应用编程中的“问题” 数据接收端无法知道数据的发送方式!!!

④ 网络编程中的期待
- 每次发送一条完整的消息,每次接收一条完整的消息
- 即使接收缓冲区中有多条消息,也不会出现消息粘连(粘包)
- 消息中涵盖了数据类型和数据长度等信息
⑤ 应用层协议设计
什么是协议?
- 协议是通信双方为数据交换而建立的规则、标准或约定的集合
协议对数据传输的作用
- 通信双方根据协议能够正确收发数据
- 通信双方根据协议能够解释数据的意义
⑥ 协议设计示例
目标:设计可用于数据传输的协议
完整消息包含
- 数据头:数据类型(即:数据区用途,固定长度)
- 数据长度:数据区长度(固定长度)
- 数据区:字节数据(变长区域)
⑦ 协议设计示例

因此:
消息至少12个字节(消息头+数据长度)
通过计算消息的总长度,能够避开数据粘连的问题

⑧ 编程实验:应用层协议设计与实现
message.c
#include "message.h"
#include <malloc.h>
#include <string.h>
Message* Message_New(unsigned short type, unsigned short cmd, unsigned short index, unsigned short total, const char* payload, unsigned int length)
{
Message* ret = malloc(sizeof(Message) + length);
if( ret )
{
ret->type = type;
ret->cmd = cmd;
ret->index = index;
ret->total = total;
ret->length = length;
if( payload )
{
memcpy(ret + 1, payload, length);
}
}
return ret;
}message.h
#ifndef MESSAGE_H
#define MESSAGE_H
typedef struct message
{
unsigned short type;
unsigned short cmd;
unsigned short index;
unsigned short total;
unsigned int length;
unsigned char payload[];
} Message;
Message* Message_New(unsigned short type,
unsigned short cmd,
unsigned short index,
unsigned short total,
const char* payload,
unsigned int length);
#endifclient.c
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include "message.h"
int main()
{
int sock = 0;
struct sockaddr_in addr = {0};
int len = 0;
char buf[128] = {0};
char input[32] = {0};
int r = 0;
Message* pm = NULL;
sock = socket(PF_INET, SOCK_STREAM, 0);
if( sock == -1 )
{
printf("socket error\n");
return -1;
}
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr("127.0.0.1");
addr.sin_port = htons(8888);
if( connect(sock, (struct sockaddr*)&addr, sizeof(addr)) == -1 )
{
printf("connect error\n");
return -1;
}
printf("connect success\n");
pm = Message_New(0, 0, 1, 3, "A", 1);
send(sock, pm, sizeof(Message) + 1, 0);
pm = Message_New(0, 0, 2, 3, "B", 1);
send(sock, pm, sizeof(Message) + 1, 0);
pm = Message_New(0, 0, 3, 3, "C", 1);
send(sock, pm, sizeof(Message) + 1, 0);
close(sock);
return 0;
}server.c
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
int main()
{
int server = 0;
struct sockaddr_in saddr = {0};
int client = 0;
struct sockaddr_in caddr = {0};
socklen_t asize = 0;
int len = 0;
char buf[32] = {0};
int r = 0;
server = socket(PF_INET, SOCK_STREAM, 0);
if( server == -1 )
{
printf("server socket error\n");
return -1;
}
saddr.sin_family = AF_INET;
saddr.sin_addr.s_addr = htonl(INADDR_ANY);
saddr.sin_port = htons(8888);
if( bind(server, (struct sockaddr*)&saddr, sizeof(saddr)) == -1 )
{
printf("server bind error\n");
return -1;
}
if( listen(server, 1) == -1 )
{
printf("server bind error\n");
return -1;
}
printf("server start success\n");
while( 1 )
{
asize = sizeof(caddr);
client = accept(server, (struct sockaddr*)&caddr, &asize);
if( client == -1 )
{
printf("client accept error\n");
return -1;
}
printf("client: %d\n", client);
do
{
r = recv(client, buf, sizeof(buf), 0);
if( r > 0 )
{
int i = 0;
for(i=0; i<r; i++)
{
printf("%02X ", buf[i]);
}
printf("\n");
}
} while ( r > 0 );
close(client);
}
close(server);
return 0;
}9、应用层协议解析模块(上)
① 问题
- 如何在代码层面封装协议细节?
- 如何将接收缓冲区中的数据解析成为Message ?
② 深度思考
- 数据是否能够解析成为Message ?
数据量足够
- 如果数据量足够,是否能够解析不止—个Message ?
- 如何处理剩余数据(属于下一个Message) ?
数据量不足
- 是否达到协议最小长度(12字节)?
- 如何处理数据量超过最小长度,但不足以创建一个Message 的情况?
③ 初步的解决方案
- 定义一个模块用于从字节流解析Message
- 可从指定内存或从指定文件描述符读取并解析
- 当至少存在12个字节时开始解析
- 首先解析协议中的头信息和数据区长度(length)
- 根据数据区长度继续从字节流读取数据(payload)
- 当协议数据解析完成时,创建Message 并返回,否则,返回NULL
④ 协议解析模块的初步设计
解析器接口定义 C-OOP

创建解析器
从内存中解析
从文件描述符中解析
重置解析器的状态
销毁指定的解析器
解析器数据结构

⑤ 从内存中解析协议数据
条件:内存长度至少连续12个字节

从内存中读取payload中的数据(可多次读取)

⑥ 编程实验:协议解析模块初步设计
msg_parser.c
#include <malloc.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>
#include "msg_parser.h"
typedef struct msg_parser
{
Message cache; // 缓存已解析的消息头
int header; // 标识消息头是否解析成功
int need; // 标识还需要多少字节才能完成解析
Message* msg; // 解析中的协议消息(半成品)
} MsgParser;
MParser* MParser_New()
{
MParser* ret = calloc(1, sizeof(MsgParser));
MParser_Reset(ret);
return ret;
}
Message* MParser_ReadMem(MParser* parser, unsigned char* mem, unsigned int length)
{
Message* ret = NULL;
MsgParser* p = (MsgParser*)parser;
if( p && mem && length )
{
if( !p->header )
{ // 解析消息头
if( p->need <= length ) // 所需字节数>12
{
memcpy(&p->cache, mem, p->need); // 拷贝消息头
// 网络字节序的转换为本机字节序
p->cache.type = ntohs(p->cache.type);
p->cache.cmd = ntohs(p->cache.cmd);
p->cache.index = ntohs(p->cache.index);
p->cache.total = ntohs(p->cache.total);
p->cache.length = ntohl(p->cache.length); // 四字节用ntohl
mem += p->need;
length -= p->need;
p->header = 1;
p->need = p->cache.length;
ret = MParser_ReadMem(p, mem, length);
}
}
else
{
if( !p->msg )
{
p->msg = malloc(sizeof(p->cache) + p->need);
if( p->msg )
{
*p->msg = p->cache; // 结构体直接复制
}
}
if( p->msg )
{
unsigned int len = (p->need < length) ? p->need : length;
unsigned int offset = p->msg->length - p->need;
memcpy(p->msg->payload, mem, len);
p->need -= len;
}
if( !p->need )
{
ret = p->msg;
p->msg = NULL;
MParser_Reset(p);
}
}
}
return ret;
}
Message* MParser_ReadFd(MParser* parser, int fd)
{
Message* ret = NULL;
return ret;
}
// 重置解析器
void MParser_Reset(MParser* parser)
{
MsgParser* p = (MsgParser*)parser;
if( p )
{
p->header = 0;
p->need = sizeof(p->cache); // 12字节
if( p->msg ) // 是否存在协议消息
{
free(p->msg); // 销毁半成品
}
p->msg = NULL; // 设置为空,避免野指针
}
}
void MParser_Del(MParser* parser)
{
MsgParser* p = (MsgParser*)parser;
if( p )
{
if( p->msg )
{
free(p->msg);
free(p);
}
}
}msg_parser.h
#ifndef MSG_PARSER_H
#define MSG_PARSER_H
#include "message.h"
typedef void MParser;
MParser* MParser_New();
Message* MParser_ReadMem(MParser* parser, unsigned char* mem, unsigned int length);
Message* MParser_ReadFd(MParser* parser, int fd);
void MParser_Reset(MParser* parser);
void MParser_Del(MParser* parser);
#endiftest.c
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include "msg_parser.h"
int main()
{
MParser* p = MParser_New();
char buf[] = {0x00, 0x01, 0x00, 0x02, 0x00, 0x03, 0x00, 0x04, 0x00, 0x00, 0x00, 0x04};
char data[] = {0x11, 0x12, 0x13, 0x14};
Message* m = MParser_ReadMem(p, buf, sizeof(buf));
int i = 0;
if( !m )
{
printf("parse again...\n");
m = MParser_ReadMem(p, data, sizeof(data));
}
printf("m = %p\n", m);
if( m )
{
printf("type = %d\n", m->type);
printf("cmd = %d\n", m->cmd);
printf("index = %d\n", m->index);
printf("total = %d\n", m->total);
printf("length = %d\n", m->length);
for(i=0; i<m->length; i++)
{
printf("0x%02X ", m->payload[i]);
}
printf("\n");
free(m);
}
MParser_Del(p);
return 0;
}10、应用层协议解析模块(下)
① 深度思考
从文件描述符是否能够获取足够的数据?
数据量足够
- 读取12字节解析消息头
- 读取数据填充 payload (length)
数据量不足
- 无法获取消息头所需数据(如何处理?解析状态如何切换?
- 无法获取payload完整数据(如何处理?是否可追加?)
是否一定等到数据量足够(如:消息头12字节),才能开始解析。
② 解决方案
策略:尽力获取数据,实时解析
- 即便当前获取1字节,也可根据状态进行解析
- 支持不同数据源多次接力解析(从内存或文件描述符交替获取数据)
充分利用解析器状态信息是实现解决方案的关键。
③ 解析器状态切换

④ 状态切换函数

⑤ 从文件描述符中获取数据 
⑥ 从文件描述符中实时解析消息头

⑦ 从文件描述符中获取payload数据

⑧ 编程实验:从文件描述符解析协议消息
client.c
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <malloc.h>
#include "message.h"
static void hton(Message* m)
{
m->type = htons(m->type);
m->cmd = htons(m->cmd);
m->index = htons(m->index);
m->total = htons(m->total);
m->length = htonl(m->length);
}
int main()
{
int sock = 0;
struct sockaddr_in addr = {0};
int len = 0;
char buf[128] = {0};
char input[32] = {0};
int r = 0;
int i = 0;
char* test = "D.T.Software";
Message* pm = NULL;
sock = socket(PF_INET, SOCK_STREAM, 0);
if( sock == -1 )
{
printf("socket error\n");
return -1;
}
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr("127.0.0.1");
addr.sin_port = htons(8888);
if( connect(sock, (struct sockaddr*)&addr, sizeof(addr)) == -1 )
{
printf("connect error\n");
return -1;
}
printf("connect success\n");
for(i=0; i<strlen(test); i++)
{
char buf[2] = {0};
buf[0] = test[i];
pm = Message_New(128, 129, i, strlen(test), buf, 2);
hton(pm);
send(sock, pm, sizeof(Message) + 2, 0);
free(pm);
}
getchar();
close(sock);
return 0;
}server.c
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <stdio.h>
#include <unistd.h>
#include <malloc.h>
#include <string.h>
#include "msg_parser.h"
int main()
{
int server = 0;
struct sockaddr_in saddr = {0};
int client = 0;
struct sockaddr_in caddr = {0};
socklen_t asize = 0;
int len = 0;
char buf[32] = {0};
int r = 0;
MParser* parser = MParser_New();
server = socket(PF_INET, SOCK_STREAM, 0);
if( server == -1 )
{
printf("server socket error\n");
return -1;
}
saddr.sin_family = AF_INET;
saddr.sin_addr.s_addr = htonl(INADDR_ANY);
saddr.sin_port = htons(8888);
if( bind(server, (struct sockaddr*)&saddr, sizeof(saddr)) == -1 )
{
printf("server bind error\n");
return -1;
}
if( listen(server, 1) == -1 )
{
printf("server bind error\n");
return -1;
}
printf("server start success\n");
while( 1 )
{
struct tcp_info info = {0};
int l = sizeof(info);
asize = sizeof(caddr);
client = accept(server, (struct sockaddr*)&caddr, &asize);
if( client == -1 )
{
printf("client accept error\n");
return -1;
}
printf("client: %d\n", client);
do
{
getsockopt(client, IPPROTO_TCP, TCP_INFO, &info, (socklen_t*)&l);
Message* m = MParser_ReadFd(parser, client);
if( m )
{
printf("type = %d\n", m->type);
printf("cmd = %d\n", m->cmd);
printf("index = %d\n", m->index);
printf("total = %d\n", m->total);
printf("length = %d\n", m->length);
printf("payload = %s\n", m->payload);
printf("\n");
free(m);
}
} while ( info.tcpi_state == TCP_ESTABLISHED );
printf("client socket is closed\n");
close(client);
}
close(server);
return 0;
}message.c
#include "message.h"
#include <malloc.h>
#include <string.h>
/**
* Function: Message_New()
* Description: 构造新消息(客户端)
* Table Accessed:
* Table Updated:
* Input: type 类型消息
* cmd 命令
* index 标识
* total 总数
* payload 数据区
* length 数据长度
* Output:
* Return: ERROR NULL
* SUCCESS 消息地址(Message*)
* Others:
* Modify Date Version Author Modification
* ---------------------------------------------------------
* 2022/02/04 1.0 Exp.Joker Create
**/
Message* Message_New(unsigned short type, unsigned short cmd, unsigned short index, unsigned short total, const char* payload, unsigned int length)
{
Message* ret = malloc(sizeof(Message) + length);
if( ret )
{
ret->type = type;
ret->cmd = cmd;
ret->index = index;
ret->total = total;
ret->length = length;
if( payload && length )
{
memcpy(ret + 1, payload, length);
}
}
return ret;
}message.h
#ifndef MESSAGE_H
#define MESSAGE_H
typedef struct message
{
unsigned short type; // 类型消息
unsigned short cmd; // 命令
unsigned short index; // 标识
unsigned short total; // 总数
unsigned int length; // 数据长度
unsigned char payload[];// 数据区
} Message;
Message* Message_New(unsigned short type,
unsigned short cmd,
unsigned short index,
unsigned short total,
const char* payload,
unsigned int length);
#endifmsg_parser.c
#include <malloc.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>
#include "msg_parser.h"
/**
* Function: InitState()
* Description: 状态初始化(服务端)
* Table Accessed:
* Table Updated:
* Input: p 解析器的相关信息
* Output:
* Return:
* Others:
* Modify Date Version Author Modification
* ---------------------------------------------------------
* 2022/02/04 1.0 Exp.Joker Create
**/
static void InitState(MsgParser* p)
{
p->header = 0;
p->need = sizeof(p->cache);
free(p->msg);
p->msg = NULL;
}
/**
* Function: ToMidState()
* Description: 中间态(服务端)
* Table Accessed:
* Table Updated:
* Input: p 解析器的相关信息
* Output:
* Return: ERROR 0
* SUCCESS 1
* Others:
* Modify Date Version Author Modification
* ---------------------------------------------------------
* 2022/02/04 1.0 Exp.Joker Create
**/
static int ToMidState(MsgParser* p)
{
p->header = 1;
p->need = p->cache.length;
p->msg = malloc(sizeof(p->cache) + p->need);
if( p->msg )
{
*p->msg = p->cache;
}
return !!p->msg;
}
/**
* Function: ToLastState()
* Description: 最终态(服务端)
* Table Accessed:
* Table Updated:
* Input: p 解析器的相关信息
* Output:
* Return: FAILTRUE NULL
* SUCCESS 消息地址(Message*)
* Others:
* Modify Date Version Author Modification
* ---------------------------------------------------------
* 2022/02/04 1.0 Exp.Joker Create
**/
static Message* ToLastState(MsgParser* p)
{
Message* ret = NULL;
if( p->header && !p->need )
{
ret = p->msg;
p->msg = NULL;
}
return ret;
}
/**
* Function: ntoh()
* Description: 网络字节序转为本机字节序(服务端)
* Table Accessed:
* Table Updated:
* Input: p 解析器的相关信息
* Output:
* Return:
* Others:
* Modify Date Version Author Modification
* ---------------------------------------------------------
* 2022/02/04 1.0 Exp.Joker Create
**/
static void ntoh(Message* m)
{
m->type = ntohs(m->type);
m->cmd = ntohs(m->cmd);
m->index = ntohs(m->index);
m->total = ntohs(m->total);
m->length = ntohl(m->length);
}
/**
* Function: ToRecv()
* Description: 接收一定量的数据(服务端)
* Table Accessed:
* Table Updated:
* Input: fd 文件描述符
* size 欲接受的数据量
* Output: buf 接收到的数据
* Return: i 实际接收到的数据量
* Others:
* Modify Date Version Author Modification
* ---------------------------------------------------------
* 2022/02/04 1.0 Exp.Joker Create
**/
static int ToRecv(int fd, char* buf, int size)
{
int retry = 0;
int i = 0;
while( i < size )
{
int len = read(fd, buf + i, size - i);
if( len > 0 )
{
i += len;
}
else if( len < 0 )
{
break;
}
else
{
if( retry++ > 5 )
{
break;
}
usleep(200 * 1000);
}
}
return i;
}
/**
* Function: MParser_New()
* Description: 创建新解析器(服务端)
* Table Accessed:
* Table Updated:
* Input:
* Output:
* Return: FAILTRUE NULL
* SUCCESS 解析器参数地址(MParser*)
* Others:
* Modify Date Version Author Modification
* ---------------------------------------------------------
* 2022/02/04 1.0 Exp.Joker Create
**/
MParser* MParser_New()
{
MParser* ret = calloc(1, sizeof(MsgParser));
if( ret )
{
InitState(ret);
}
return ret;
}
/**
* Function: MParser_ReadMem()
* Description: 从内存读取数据(服务端)
* Table Accessed:
* Table Updated:
* Input: parser 解析器参数
* mem 数据的内存地址
* length 数据量
* Output:
* Return:
* Others:
* Modify Date Version Author Modification
* ---------------------------------------------------------
* 2022/02/04 1.0 Exp.Joker Create
**/
Message* MParser_ReadMem(MParser* parser, unsigned char* mem, unsigned int length)
{
Message* ret = NULL;
MsgParser* p = (MsgParser*)parser;
if( p && mem && length )
{
if( !p->header )
{
int len = (p->need < length) ? p->need : length;
int offset = sizeof(p->cache) - p->need;
memcpy((char*)&p->cache + offset, mem, len);
if( p->need == len )
{
ntoh(&p->cache);
mem += p->need;
length -= p->need;
if( ToMidState(p) )
{
ret = MParser_ReadMem(p, mem, length);
}
else
{
InitState(p);
}
}
else
{
p->need -= len;
}
}
else
{
if( p->msg )
{
int len = (p->need < length) ? p->need : length;
int offset = p->msg->length - p->need;
memcpy(p->msg->payload + offset, mem, len);
p->need -= len;
}
if( ret = ToLastState(p) )
{
InitState(p);
}
}
}
return ret;
}
/**
* Function: MParser_ReadFd()
* Description: 从文件描述符读取数据(服务端)
* Table Accessed:
* Table Updated:
* Input: fd 文件描述符
* parser 解析器参数
* Output:
* Return: Message (MParser*)
* Others:
* Modify Date Version Author Modification
* ---------------------------------------------------------
* 2022/02/04 1.0 Exp.Joker Create
**/
Message* MParser_ReadFd(MParser* parser, int fd)
{
Message* ret = NULL;
MsgParser* p = (MsgParser*)parser;
if( (fd != -1) && p )
{
if( !p->header )
{
int offset = sizeof(p->cache) - p->need;
int len = ToRecv(fd, (char*)&p->cache + offset, p->need);
if( len == p->need )
{
ntoh(&p->cache);
if( ToMidState(p) )
{
ret = MParser_ReadFd(p, fd);
}
else
{
InitState(p);
}
}
else
{
p->need -= len;
}
}
else
{
if( p->msg )
{
int offset = p->msg->length - p->need;
int len = ToRecv(fd, p->msg->payload + offset, p->need);
p->need -= len;
}
if( ret = ToLastState(p) )
{
InitState(p);
}
}
}
return ret;
}
/**
* Function: MParser_Reset()
* Description: 复位解析器(服务端)
* Table Accessed:
* Table Updated:
* Input:
* Output:
* Return: FAILTRUE NULL
* SUCCESS 解析器参数地址(MParser*)
* Others:
* Modify Date Version Author Modification
* ---------------------------------------------------------
* 2022/02/04 1.0 Exp.Joker Create
**/
void MParser_Reset(MParser* parser)
{
MsgParser* p = (MsgParser*)parser;
if( p )
{
InitState(p);
}
}
/**
* Function: MParser_Del()
* Description: 删除解析器(服务端)
* Table Accessed:
* Table Updated:
* Input:
* Output:
* Return: FAILTRUE NULL
* SUCCESS 解析器参数地址(MParser*)
* Others:
* Modify Date Version Author Modification
* ---------------------------------------------------------
* 2022/02/04 1.0 Exp.Joker Create
**/
void MParser_Del(MParser* parser)
{
MsgParser* p = (MsgParser*)parser;
if( p )
{
free(p->msg);
free(p);
}
}msg_parser.h
#ifndef MSG_PARSER_H
#define MSG_PARSER_H
#include "message.h"
typedef void MParser;
typedef struct msg_parser
{
Message cache; // 缓存已解析的消息头
int header; // 标识消息头是否解析成功
int need; // 标识还需要多少字节才能完成解析
Message* msg; // 解析中的协议消息(半成品)
} MsgParser;
MParser* MParser_New();
Message* MParser_ReadMem(MParser* parser, unsigned char* mem, unsigned int length);
Message* MParser_ReadFd(MParser* parser, int fd);
void MParser_Reset(MParser* parser);
void MParser_Del(MParser* parser);
#endiftest.c
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include "msg_parser.h"
int main()
{
MParser* p = MParser_New();
char buf[] = {0x00, 0x01, 0x00, 0x02, 0x00};
char another[] = {0x03, 0x00, 0x04, 0x00, 0x00, 0x00, 0x04};
char data[] = {0x11, 0x12, 0x13, 0x14};
Message* m = MParser_ReadMem(p, buf, sizeof(buf));
int i = 0;
if( !m )
{
printf("parse again...\n");
m = MParser_ReadMem(p, another, sizeof(another));
}
if( !m )
{
printf("parse again again...\n");
m = MParser_ReadMem(p, data, sizeof(data));
}
printf("m = %p\n", m);
if( m )
{
printf("type = %d\n", m->type);
printf("cmd = %d\n", m->cmd);
printf("index = %d\n", m->index);
printf("total = %d\n", m->total);
printf("length = %d\n", m->length);
for(i=0; i<m->length; i++)
{
printf("0x%02X ", m->payload[i]);
}
printf("\n");
free(m);
}
MParser_Del(p);
return 0;
}