外观
05.TCP通信框架:客户端和服务端设计
约 2298 字大约 8 分钟
嵌入式NETLinux单片机
2022-06-14
11、TCP通信框架:客户端设计
① TCP通信框架设计:封装、复用

客户端
- 以协议消息为基本单位收发数据
- 同时支持字节为基本单位收发数据
服务端
- 负责监听连接,并产生通信客户端(服务器产生的)
- 负责监听数据通信状态,并给出通知


② 职责定义(功能定义)
客户端用于进行实际的双向数据通信
- 数据发送&数据接收(协议消息、字节)
服务端仅用于监听和回调通知
- 事件类型:连接,数据,断开
- 事件回调: void (*Listener)(TcpClient*client, int event);
③ 客户端接口设计

④ 客户端关键代码实现


⑤ 编程实验:客户端设计与实现
代码与下一节合并
12、TCP通信框架:服务端设计
① TCP通信框架设计

服务端
负责监听连接状态
- Connect:产生通信客户端(TcpClient),并给出事件通知
- Close:给出事件通知,并销毁通信客户端
负责监听数据通信状态,并给出事件通知
服务端事件设计
EVT_CONN
- 客户端连接服务端时触发,并创建TcpClient用于通信
EVT_DATA
- 客户端数据到达服务端时触发,使用TcpClient读取数据
EVT_CLOSE
- 客户端断开服务端时触发,相关TcpClient将被销毁

问题:服务端如何知道什么时候进行事件回调通知?
- 服务端通过select机制循环触发事件回调!

② 服务端接口设计

③ 服务端关键代码实现-初始化

④ 服务端关键代码实现-事件监听

⑤ 服务端关键代码实现-连接事件&数据事件

⑥ 服务端关键代码实现-断连事件&事件通知

⑦ 编程实验:服务端设计与实现
client.c
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <malloc.h>
#include "message.h"
#include "tcp_client.h"
int main()
{
int i = 0;
char* test = "D.T.Software";
Message* pm = NULL;
TcpClient* client = TcpClient_New();
if( client && TcpClient_Connect(client, "127.0.0.1", 8888) )
{
printf("connect success\n");
for(i=0; i<strlen(test); i++)
{
char buf[2] = {0};
buf[0] = test[i];
pm = Message_New(128, 129, i, strlen(test), buf, 2);
TcpClient_SendMsg(client, pm);
free(pm);
}
}
getchar();
TcpClient_Del(client);
return 0;
}server.c
#include <stdio.h>
#include <unistd.h>
#include <malloc.h>
#include <string.h>
#include "tcp_server.h"
void EventListener(TcpClient* client, int evt)
{
if( evt == EVT_CONN )
{
printf("Connect: %p\n", client);
}
else if( evt == EVT_DATA )
{
Message* m = TcpClient_RecvMsg(client);
if( m )
{
char* s = TcpClient_GetData(client);
if( m->index == 0 )
{
s = malloc(m->total + 1);
TcpClient_SetData(client, s);
}
strcpy(s+m->index, m->payload);
if( (m->index + 1) == m->total )
{
printf("Data: %s\n", s);
free(s);
}
free(m);
}
}
else if( evt == EVT_CLOSE )
{
printf("Close: %p\n", client);
}
}
int main()
{
TcpServer* server = TcpServer_New();
if( server )
{
int r = TcpServer_Start(server, 8888, 20);
printf("r = %d\n", r);
if( r )
{
TcpServer_SetListener(server, EventListener);
TcpServer_DoWork(server);
}
}
return 0;
}message.c
#include "message.h"
#include <malloc.h>
#include <string.h>
#include <arpa/inet.h>
Message* Message_New(unsigned short type, unsigned short cmd, unsigned short index, unsigned short total, const char* payload, unsigned int length)
{
Message* ret = malloc(sizeof(Message) + length);
if( ret )
{
ret->type = type;
ret->cmd = cmd;
ret->index = index;
ret->total = total;
ret->length = length;
if( payload )
{
memcpy(ret + 1, payload, length);
}
}
return ret;
}
int Message_Size(Message* m)
{
int ret = 0;
if( m )
{
ret = sizeof(Message) + m->length;
}
return ret;
}
Message* Message_N2H(Message* m)
{
if( m )
{
m->type = ntohs(m->type);
m->cmd = ntohs(m->cmd);
m->index = ntohs(m->index);
m->total = ntohs(m->total);
m->length = ntohl(m->length);
}
return m;
}
Message* Message_H2N(Message* m)
{
if( m )
{
m->type = htons(m->type);
m->cmd = htons(m->cmd);
m->index = htons(m->index);
m->total = htons(m->total);
m->length = htonl(m->length);
}
return m;
}message.h
#ifndef MESSAGE_H
#define MESSAGE_H
typedef struct message
{
unsigned short type;
unsigned short cmd;
unsigned short index;
unsigned short total;
unsigned int length;
unsigned char payload[];
} Message;
Message* Message_New(unsigned short type, unsigned short cmd, unsigned short index, unsigned short total, const char* payload, unsigned int length);
int Message_Size(Message* m);
Message* Message_N2H(Message* m);
Message* Message_H2N(Message* m);
#endifmsg_parser.c
#include <malloc.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>
#include "msg_parser.h"
typedef struct msg_parser
{
Message cache; // 缓存已解析的消息头
int header; // 标识消息头是否解析成功
int need; // 标识还需要多少字节才能完成解析
Message* msg; // 解析中的协议消息(半成品)
} MsgParser;
static void InitState(MsgParser* p)
{
p->header = 0;
p->need = sizeof(p->cache);
free(p->msg);
p->msg = NULL;
}
static int ToMidState(MsgParser* p)
{
p->header = 1;
p->need = p->cache.length;
p->msg = malloc(sizeof(p->cache) + p->need);
if( p->msg )
{
*p->msg = p->cache;
}
return !!p->msg;
}
static Message* ToLastState(MsgParser* p)
{
Message* ret = NULL;
if( p->header && !p->need )
{
ret = p->msg;
p->msg = NULL;
}
return ret;
}
static int ToRecv(int fd, char* buf, int size)
{
int retry = 0;
int i = 0;
while( i < size )
{
int len = read(fd, buf + i, size - i);
if( len > 0 )
{
i += len;
}
else if( len < 0 )
{
break;
}
else
{
if( retry++ > 5 )
{
break;
}
usleep(200 * 1000);
}
}
return i;
}
MParser* MParser_New()
{
MParser* ret = calloc(1, sizeof(MsgParser));
if( ret )
{
InitState(ret);
}
return ret;
}
Message* MParser_ReadMem(MParser* parser, unsigned char* mem, unsigned int length)
{
Message* ret = NULL;
MsgParser* p = (MsgParser*)parser;
if( p && mem && length )
{
if( !p->header )
{
int len = (p->need < length) ? p->need : length;
int offset = sizeof(p->cache) - p->need;
memcpy((char*)&p->cache + offset, mem, len);
if( p->need == len )
{
Message_N2H(&p->cache);
mem += p->need;
length -= p->need;
if( ToMidState(p) )
{
ret = MParser_ReadMem(p, mem, length);
}
else
{
InitState(p);
}
}
else
{
p->need -= len;
}
}
else
{
if( p->msg )
{
int len = (p->need < length) ? p->need : length;
int offset = p->msg->length - p->need;
memcpy(p->msg->payload + offset, mem, len);
p->need -= len;
}
if( ret = ToLastState(p) )
{
InitState(p);
}
}
}
return ret;
}
Message* MParser_ReadFd(MParser* parser, int fd)
{
Message* ret = NULL;
MsgParser* p = (MsgParser*)parser;
if( (fd != -1) && p )
{
if( !p->header )
{
int offset = sizeof(p->cache) - p->need;
int len = ToRecv(fd, (char*)&p->cache + offset, p->need);
if( len == p->need )
{
Message_N2H(&p->cache);
if( ToMidState(p) )
{
ret = MParser_ReadFd(p, fd);
}
else
{
InitState(p);
}
}
else
{
p->need -= len;
}
}
else
{
if( p->msg )
{
int offset = p->msg->length - p->need;
int len = ToRecv(fd, p->msg->payload + offset, p->need);
p->need -= len;
}
if( ret = ToLastState(p) )
{
InitState(p);
}
}
}
return ret;
}
void MParser_Reset(MParser* parser)
{
MsgParser* p = (MsgParser*)parser;
if( p )
{
InitState(p);
}
}
void MParser_Del(MParser* parser)
{
MsgParser* p = (MsgParser*)parser;
if( p )
{
free(p->msg);
free(p);
}
}msg_parser.h
#ifndef MSG_PARSER_H
#define MSG_PARSER_H
#include "message.h"
typedef void MParser;
MParser* MParser_New();
Message* MParser_ReadMem(MParser* parser, unsigned char* mem, unsigned int length);
Message* MParser_ReadFd(MParser* parser, int fd);
void MParser_Reset(MParser* parser);
void MParser_Del(MParser* parser);
#endiftcp_client.c
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <malloc.h>
#include "tcp_client.h"
#include "msg_parser.h"
typedef struct tcp_client
{
int fd;
MParser* parser;
void* data;
} Client;
TcpClient* TcpClient_New()
{
return TcpClient_From(-1);
}
TcpClient* TcpClient_From(int fd)
{
Client* ret = malloc(sizeof(Client));
if( ret )
{
ret->fd = fd;
ret->parser = MParser_New();
ret->data = NULL;
}
return (ret && ret->parser) ? ret : (free(ret), NULL);
}
int TcpClient_SendMsg(TcpClient* client, Message* msg)
{
int ret = 0;
Client* c = (Client*)client;
if( c && msg )
{
int len = Message_Size(msg);
char* data = (char*)Message_H2N(msg);
ret = (send(c->fd, data, len, 0) != -1);
Message_N2H(msg);
}
return ret;
}
int TcpClient_SendRaw(TcpClient* client, char* buf, int length)
{
int ret = 0;
Client* c = (Client*)client;
if( c && buf )
{
ret = send(c->fd, buf, length, 0);
}
return ret;
}
Message* TcpClient_RecvMsg(TcpClient* client)
{
Message* ret = NULL;
Client* c = (Client*)client;
if( c )
{
ret = MParser_ReadFd(c->parser, c->fd);
}
return ret;
}
int TcpClient_RecvRaw(TcpClient* client, char* buf, int length)
{
int ret = 0;
Client* c = (Client*)client;
if( c && buf )
{
ret = recv(c->fd, buf, length, 0);
}
return ret;
}
int TcpClient_Connect(TcpClient* client, char* ip, int port)
{
int ret = TcpClient_IsValid(client);
Client* c = (Client*)client;
if( !ret && ip && c && ((c->fd = socket(PF_INET, SOCK_STREAM, 0)) != -1) )
{
struct sockaddr_in addr = {0};
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr(ip);
addr.sin_port = htons(port);
ret = (connect(c->fd, (struct sockaddr*)&addr, sizeof(addr)) != -1);
}
return ret;
}
int TcpClient_IsValid(TcpClient* client)
{
int ret = 0;
Client* c = (Client*)client;
if( c )
{
struct tcp_info info = {0};
int l = sizeof(info);
getsockopt(c->fd, IPPROTO_TCP, TCP_INFO, &info, (socklen_t*)&l);
ret = (info.tcpi_state == TCP_ESTABLISHED);
}
return ret;
}
void TcpClient_Close(TcpClient* client)
{
Client* c = (Client*)client;
if( c )
{
close(c->fd);
c->fd = -1;
MParser_Reset(c->parser);
}
}
void TcpClient_Del(TcpClient* client)
{
Client* c = (Client*)client;
if( c )
{
TcpClient_Close(c);
MParser_Del(c->parser);
free(c);
}
}
void TcpClient_SetData(TcpClient* client, void* data)
{
Client* c = (Client*)client;
if( c )
{
c->data = data;
}
}
void* TcpClient_GetData(TcpClient* client)
{
void* ret = NULL;
Client* c = (Client*)client;
if( c )
{
ret = c->data;
}
return ret;
}tcp_client.h
#ifndef TCP_CLIENT_H
#define TCP_CLIENT_H
#include "message.h"
typedef void TcpClient;
TcpClient* TcpClient_New();
TcpClient* TcpClient_From(int fd);
int TcpClient_SendMsg(TcpClient* client, Message* msg);
int TcpClient_SendRaw(TcpClient* client, char* buf, int length);
Message* TcpClient_RecvMsg(TcpClient* client);
int TcpClient_RecvRaw(TcpClient* client, char* buf, int length);
int TcpClient_Connect(TcpClient* client, char* ip, int port);
int TcpClient_IsValid(TcpClient* client);
void TcpClient_Close(TcpClient* client);
void TcpClient_Del(TcpClient* client);
void TcpClient_SetData(TcpClient* client, void* data);
void* TcpClient_GetData(TcpClient* client);
#endiftcp_server.c
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <malloc.h>
#include "tcp_client.h"
#include "tcp_server.h"
#define FD_SIZE 1024
typedef struct tcp_server
{
int fd;
int valid;
Listener cb;
TcpClient* client[FD_SIZE];
} Server;
TcpServer* TcpServer_New()
{
Server* ret = malloc(sizeof(Server));
if( ret )
{
int i = 0;
ret->fd = -1;
ret->valid = 0;
ret->cb = NULL;
for(i=0; i<FD_SIZE; i++)
{
ret->client[i] = NULL;
}
}
return ret;
}
int TcpServer_Start(TcpServer* server, int port, int max)
{
Server* s = (Server*)server;
if( s && !s->valid )
{
struct sockaddr_in saddr = {0};
s->fd = socket(PF_INET, SOCK_STREAM, 0);
s->valid = (s->fd != -1);
saddr.sin_family = AF_INET;
saddr.sin_addr.s_addr = htonl(INADDR_ANY);
saddr.sin_port = htons(port);
s->valid = s->valid && (bind(s->fd, (struct sockaddr*)&saddr, sizeof(saddr)) != -1 );
s->valid = s->valid && (listen(s->fd, max) != -1);
}
return s->valid;
}
void TcpServer_Stop(TcpServer* server)
{
Server* s = (Server*)server;
if( s )
{
int i = 0;
s->valid = 0;
close(s->fd);
for(i=0; i<FD_SIZE; i++)
{
TcpClient_Del(s->client[i]);
s->client[i] = NULL;
}
}
}
void TcpServer_SetListener(TcpServer* server, Listener listener)
{
Server* s = (Server*)server;
if( s )
{
s->cb = listener;
}
}
int TcpServer_IsValid(TcpServer* server)
{
return server ? ((Server*)server)->valid : 0;
}
static int SelectHandler(Server* s, fd_set* rset, fd_set* reads, int num, int max)
{
int ret = max;
int i = 0;
for(i=0; i<=max; i++)
{
if( FD_ISSET(i, rset) )
{
int index = i;
int event = -1;
if( index == s->fd )
{
struct sockaddr_in caddr = {0};
socklen_t asize = sizeof(caddr);
index = accept(s->fd, (struct sockaddr*)&caddr, &asize);
if( index > -1 )
{
FD_SET(index, reads);
ret = (index > max) ? index : max;
s->client[index] = TcpClient_From(index);
event = EVT_CONN;
}
}
else
{
event = EVT_DATA;
}
if( s->cb )
{
if( TcpClient_IsValid(s->client[index]) )
{
s->cb(s->client[index], event);
}
else
{
if( s->client[index] )
{
s->cb(s->client[index], EVT_CLOSE);
}
TcpClient_Del(s->client[index]);
s->client[index] = NULL;
FD_CLR(index, reads);
}
}
}
}
return ret;
}
void TcpServer_DoWork(TcpServer* server)
{
Server* s = (Server*)server;
if( s && s->valid )
{
int max = 0;
int num = 0;
fd_set reads = {0};
fd_set rset = {0};
struct timeval timeout = {0};
FD_ZERO(&reads);
FD_SET(s->fd, &reads);
max = s->fd;
while( s->valid )
{
rset = reads;
timeout.tv_sec = 0;
timeout.tv_usec = 10000;
num = select(max+1, &rset, 0, 0, &timeout);
if( num > 0 )
{
max = SelectHandler(s, &rset, &reads, num, max);
}
}
}
}
void TcpServer_Del(TcpServer* server)
{
TcpServer_Stop(server);
free(server);
}tcp_server.h
#ifndef TCP_SERVER_H
#define TCP_SERVER_H
#include "tcp_client.h"
typedef void TcpServer;
typedef void (*Listener)(TcpClient*, int);
enum
{
EVT_CONN,
EVT_DATA,
EVT_CLOSE
};
TcpServer* TcpServer_New();
int TcpServer_Start(TcpServer* server, int port, int max);
void TcpServer_Stop(TcpServer* server);
void TcpServer_SetListener(TcpServer* server, Listener listener);
int TcpServer_IsValid(TcpServer* server);
void TcpServer_DoWork(TcpServer* server);
void TcpServer_Del(TcpServer* server);
#endif