Live555源码解析思路分享
开始分析MediaServer源码前,结合官方文档说明,对各文件夹源码总结如下:
groupsock
静态库,封装了network interfaces和sockets。举例而言,Groupsock类中封装了一个收发组播数据包的socket。liveMedia
静态库,定义了一系列类簇,根类为Medium,这些类支持了多种媒体类型和编解码器。WindowsAudioInputDevice
静态库,实现了liveMedia中的AudioInputDevice虚基类,可为Windows程序提供从输入设备中读取PCM audio samples的功能。UsageEnvironment
静态库,内部主要有如下几个虚基类:- TaskScheduler
为DelayedTask、socket后台操作处理、事件event提供调度支持 - HashTable
定义通用hash表的接口,供其他代码使用 - UsageEnvironment
集成TaskScheduler、groupsock、liveMedia等模块功能,添加Result、Error消息机制,从而提供使用环境。
- TaskScheduler
BasicUsageEnvironment
静态库,由于UsageEnvironment中仅给出虚基类定义,为了能够真正地运行,所以从UsageEnvironment继承并实现了BasicUsageEnvironment类。它用于实现易用的命令行程序,其中,Read events和delayed operations通过select()循环进行支持。mediaServer
可执行文件,通过逻辑集成上述库所提供的的功能,提供串流服务。
面对一个庞大的项目工程,想要分析源码,必须先找出有效突破口,本文中将从live555MediaServer.cpp 中main()函数开始切割工程。
1. main()
main()函数并不复杂,下面列出整理后的源码:
int main(int argc, char **argv)
{
// 准备使用环境
TaskScheduler* scheduler = BasicTaskScheduler::createNew();
UsageEnvironment* env = BasicUsageEnvironment::createNew(*scheduler);
// RTSP Server授权控制,默认关闭. 如需开启,按如下步骤:
// 1. define ACCESS_CONTROL
// 2. authDB->addUserRecord("xxxx", "xxxx");
UserAuthenticationDatabase* authDB = NULL;
#ifdef ACCESS_CONTROL
authDB = new UserAuthenticationDatabase;
authDB->addUserRecord("username1", "password1");
#endif
// 创建RTSPServer,默认使用554端口,如已被占用或其他错误,则尝试使用8554端口
RTSPServer* rtspServer;
portNumBits rtspServerPortNum = 554;
rtspServer = DynamicRTSPServer::createNew(*env, rtspServerPortNum, authDB);
if(rtspServer == NULL)
{
rtspServerPortNum = 8554;
rtspServer = DynamicRTSPServer::createNew(*env, rtspServerPortNum, authDB);
}
if(rtspServer == NULL)
{
*env << "Failed to create RTSP server: " << env->getResultMsg() <<"\n";
exit(1);
}
// 打印版本信息、rtspURL前缀、及其他说明信息
*env << ...
//@1 尝试建立一个HTTP server,用于提供 RTSP-over-HTTP通道,尝试端口为80, 8000, 8080
if(rtspServer->setUpTunnelingOverHTTP(80) || rtspServer->setUpTunnelingOverHTTP(8000) || rtspServer->setUpTunnelingOverHTTP(8080))
{
*env << "(We use port " << rtspServer->httpServerPortNum() << " for optional RTSP-over-HTTP tunneling, or for HTTP live streaming (for indexed Transport Stream files only).)\n";
}
else
{
*env << "(RTSP-over-HTTP tunneling is not available.)\n";
}
//@2 开启事件循环
env->taskScheduler().doEventLoop();
return 0;
}
源码分析:
@1 建立HTTP server
所谓Tunneling,实质上分为三个步骤:- 将源数据(无论什么格式)抽象为简单数据
- 发送端使用目标协议对数据进行包裹,此处为HTTP
- 接收端参照协议对数据解包,重新获得源数据
为什么要进行HTTP Tunneling?
因为部分客户端可能为iPhones或iPads,而这些设备仅支持Apple家的HLS(HTTP Live Streaming)机制的串流。具体信息可参阅官方文档中Streaming to iPhones and iPads一节。
- @2 开启事件循环
时间循环的开启实质上是通过UsageEnvironment的TaskScheduler中doEventLoop()函数完成的,这是整个Server活过来的key,因此必然需要进一步跟踪进去。
2. env->taskScheduler().doEventLoop()
先放源码:
void BasicTaskScheduler0::doEventLoop(char volatile* watchVariable)
{
while(1){
//@1 wathcVariable
if(watchVariable != NULL && *watchVariable != 0) break;
//@2 SingleStep()
SingleStep();
}
}
源码分析:
- @1 watchVariable 当且仅当watchVariable有值时提前break
查找doEventLoop()调用处,发现除main()中该值为NULL外,其他地方都给出了变量用于控制执行。
调用者分别有:
```
- live555MediaServer -> main()
- DynamicRTSPServer
- AC3AudioStreamFramer
- DVVideoStreamFramer
- H264VideoFileServerMediaSubsession
- H265VideoFileServerMediaSubsession
- MP3StreamState
- MPEG1or2FileServerDemux
- MPEG4VideoFileServerMediaSubsession
- SIPClient
```
- @2 SingleStep
查找得到该函数实现部分位于BasicTaskScheduler中。
此处不免有些困惑,为何不直接放在类BasicTaskScheduler0中?
为了解开这个困惑,此处先粗略整理如下继承、功能关系(后续会具体分析):
- 虚基类TaskScheduler
内含DelayedTask、socket operations background handling以及直接的EventTrigger三类事件处理接口。- BasicTaskScheduler0 : public TaskScheduler
主要实现doEventLoop()函数及DelayedTask、EventTrigger部分- BasicTaskScheduler : public BasicTaskScheduler0
主要实现SingleStep()函数及background handling部分
3. SingleStep()
放出整理后源码:
void BasicTaskScheduler::SingleStep(unsigned maxDelayTime)
{
fd_set readSet = fReadSet;
fd_set writeSet = fWriteSet;
fd_set exceptionSet = fExceptionSet
DelayInterval const& timeToDelay = fDelayQueue.timeToNextAlarm();
struct timeval tv_timeToDelay;
tv_timeToDelay.tv_sec = timeToDelay.seconds();
tv_timeToDelay.tv_usec = timeToDelay.useconds();
// .tv_sec过大(<= 11.5days)会引发select()失败,所以先规范tv_timeToDelay值域
...
//@1 select
int selectResult = select(fMaxNumSockets, &readSet, &writeSet, &exceptionSet, &tv_timeToDelay);
if(selectResult <0)
{
if( GetLastError() != EINTR )
{
// 异常错误,视为严重故障;打印错误信息后退出
print_Set_info();
abort();
}
}
else //if(selectResult <0)
{
//@2 HandlerIterator, HandlerDescriptor, HandlerSet
HandlerIterator iter(*fHandlers);
HandlerDescriptor* handler;
//@3 fLastHandledSocketNum
if(fLastHandledSocketNum >= 0)
{
// 如已处理过socket读写,则找到前次socket读写的下一个链表节点
while((handler = iter.next()) != NULL)
{
if(handler->socketNum == fLastHandledSocketNum) break;
}
if(handler == NULL)
{
// 未找到,重置相关值
fLastHandlerSocketNum = -1;
iter.reset();
}
}
while((handler = iter.next()) != NULL)
{
// 找到链表中合法节点,开始处理
int sock = handler->socketNum;
int resultConditionSet = 0;
if(FD_ISSET(sock, &readSet) && FD_ISSET(sock, &fReadSet)) resultConditionSet |= SOCKET_READABLE;
if(FD_ISSET(sock, &writeSet) && FD_ISSET(sock, &fWriteSet) resultConditionSet |= SOCKET_WRITEABLE;
if(FD_ISSET(sock, &exceptionSet) && FD_ISSET(sock, &fExceptionSet) resultConditionSet |= SOCKET_EXCEPTION;
if((resultConditionSet&handler->conditionSet) != 0 && handler->handlerProc != NULL)
{
// 保存当前处理节点socketNum
fLastHandledSocketNum = sock;
(*handler->handlerProc)(handler->clientData, resultConditionSet);
break;
}
} // while((handler = iter.next()) != NULL)
if(handler == NULL && fLastHandledSocketNum >= 0)
{
// 上一次处理的socketNum不为0,且未在当前链表中找到,但只要链表本身不为空,就必须处理socket读写
int sock = handler->socketNum;
int resultConditionSet = 0;
if(FD_ISSET(sock, &readSet) && FD_ISSET(sock, &fReadSet)) resultConditionSet |= SOCKET_READABLE;
if(FD_ISSET(sock, &writeSet) && FD_ISSET(sock, &fWriteSet) resultConditionSet |= SOCKET_WRITEABLE;
if(FD_ISSET(sock, &exceptionSet) && FD_ISSET(sock, &fExceptionSet) resultConditionSet |= SOCKET_EXCEPTION;
if((resultConditionSet&handler->conditionSet) != 0 && handler->handlerProc != NULL)
{
fLastHandledSocketNum = sock;
(*handler->handlerProc)(handler->clientData, resultConditionSet);
break;
}
if(handler == NULL) fLastHandledSocketNum = -1;
}
//@4 fTriggersAwaitingHandling && fLastUsedTriggerMask
if(fTriggersAwaitingHandling != 0)
{
if(fTriggersAwaitingHandling == fLastUsedTriggerMask)
{
fTriggersAwaitingHandling &= ~fLastUsedTriggerMask;
if(fTriggeredEventHandlers[fLastUsedTriggerNum] != NULL)
{
(*fTriggeredEventHandlers[fLastUsedTriggerNum])(fTriggeredEventClientDtas[fLastUsedTriggerNum]);
}
}
else
{
unsigned i = fLastUsedTriggerNum;
EventTriggerId mask = fLastUsedTriggerMask;
do{
i = (i+1)%MAX_NUM_EVENT_TRIGGERS;
mask >>= 1;
if(mask == 0) mask = 0x80000000;
if((fTriggersAwaitingHandling&mask) != 0)
{
fTriggersAwaitingHandling &= ~mask;
if(fTriggeredEventHandlers[i] != NULL)
{
(*fTriggeredEventHandlers[i])(fTriggeredEventClientDatas[i]);
}
fLastUsedTriggerMask = mask;
fLastUsedTriggerNum = i;
break;
}
} while(i != fLastUsedTriggerNum);
}
}
}
//@5 hadnleAlarm
fDelayQueue.hadnleAlarm();
}
源码分析:
- @1 select
select(或pselect)允许进程监控多个文件描述符(fd),阻塞直到至少一个fd处于I/O操作ready状态为止。
通常fd会在可进行进行块读取或高效写操作时,切换为ready状态。
当select成功时,会返回参数中三个set(read/write/exception)之一的fd,如返回值为0,表示超时前并未出发任何fd。
如果返回值为-1,表示函数出错,可进一步获取文件错误代码(Window使用WSAGetLastError, Linux中直接使用errno值)并进行判断分析:
- EBADF set参数中存在无效fd
- EINTR 中断信号
- EINVAL nfds参数为负数,或timeout时长无效
- ENOMEM 内部表格分配内存失败
更进一步的select说明,将在后续单独说明后给出链接。
- @2 HandlerIterator, HandlerDescriptor, HandlerSet
千言万语不如一张图。下图中列出三者关系,颜色与说明相对应。
那么问题来了,该双向链表是怎么产生的?也就是说谁,何时,在哪里调用了HandlerSet::assignHandler()函数?
通过查找assignHandler,发现唯一调用入口为BasicTaskScheduler::setBackgroundHandling;
进一步的,调用setBackgroundHandling()函数的有如下位置:
- GenericMediaServer::ClientConnection::ClientConnection() READABLE|EXCEPTION incomingRequstHandler
- SocketDescriptor::registerRTPInterface() READABLE|EXCEPTION tcpReadHandler
- RTSP::RTSPClient() READABLE|EXCEPTION incomingDataHandler
- RTSPClient::openConnection() READABLE|EXCEPTION incomingDataHandler
- RTSPClient::openConnection() READABLE|EXCEPTION incomingDataHandler
- RTSPClient::connectToServer() WRITABLE|EXCEPTION connectionHandler
- RTSPClient::handleAlternativeRequestByte1() READABLE|EXCEPTION incomingDataHandler
- RTSPClient::connectionHandler1() READABLE|EXCEPTION incomingDataHandler
- RTSPServer::RTSPClientConnection::handleAlternativeRequestByte1() READABLE|EXCEPTION incomingRequestHandler
- RTSPServer::RTSPClientConnection::changeClientInputSocket() READABLE|EXCEPTION incomingRequstHandler
- TCPStreamSink::processBuffer() WRITABLE socketWritableHandler
大致分析可得出如下结论:
- 事件来源主要为创建RTSP实例,RTSPClient、RTSPServer间交互,以及TCPSink进行Buffer处理
事件Handler对应为Request处理,连接建立、接收数据处理,以及TCP读写
事件部分本篇就到此为止,后续将更进一步展开分析。@3 fLastHandledSocketNum
fLastHandledSocketNum只有两种取值可能:- -1 :HandlerSet为空时,因此默认值也为-1
- 已成功处理过得Handler socketNum值
具体执行过程,已在上述源码中添加注释。
@4 fTriggersAwaitingHandling && fLastUsedTriggerMask
由于event handler会修改可读socket set,因此我们在处理完socket后才开始检查Event情况。
fTriggersAwaitingHandling和fLastUsedTriggerMask都是以32bit值进行实现的,也就是说,使用时,他们的每一个bit都具有意义。
默认情况下,fTriggersAwaitingHandling值为0,fLastUsedTriggerMask为1。而源码中可以看到,如fTriggersAwaitingHandling为0值时,是不会启动event trigger机制的。
因此首先要找到,fTriggersAwaitingHandling值会首先在哪里被修改。结果如下:
- BasicTaskScheduler0::deleteEventTrigger() fTriggersAwaitingHandling &= ~eventTriggerId
- BasicTaskScheduler0::triggerEvent() fTriggersAwaitingHandling |= eventTriggerId;
即Event处理机制启动前,必须先triggerEvent进行添加,且在处理前未deleteEventTrigger才可以生效。
fLastUsedTriggerMask的功效和fLastHandledSocketNum类似,都只是为了更快的定位到开始点,并进行循环查找。
- @5 hadnleAlarm
socket IO和event都处理完后,终于轮到延时任务,它是通过handleAlarm进行处理。
handleAlarm()
惯例先放源码:
void DelayQueue::handleAlarm()
{
// @1 synchronize
if(head()->fDeltaTimeRemaining != DELAY_ZERO) synchronize();
if(head()->fDelatTimeRemaining == DELAY_ZERO)
{
DelayQueueEntry *toRemove = head();
removeEntry(toRemove);
toRemove->handleTimeout();
}
}
源码分析:
- @1 synchronize
检查到DeltaTimeRemaining不为0时,需要进行同步。这里需要补充下定时器的概念。
> 定时器实现
> 计算机中存在两种时钟,分别为:
> * 硬件时钟,电池供电,也称为RTC,在系统关机时起作用
> * 系统时钟,基于RTC根据CPU中断计时,电脑运行时有效
>
> 两者频率并不严格相同,细节部分这里不具体讨论,这里只要知道系统时钟的前提是CPU中断即可。
假设开启了一个计时器,比如Sleep(5000),然而CPU每个时钟周期并非以ms为单位,如2.4GHz的CPU,时钟周期为(1/2.4G)s,因此每个时钟周期或中断产生(具体机制视CPU而定),会检查剩余时间是否为0甚至小于0。时钟周期的差异会引发计时器的误差,也因为如此,计时器都是有精度的,并非都能实现所需精度。
回到源码,这里的DelayedTask也是使用了类似的定时器机制,每次同步仅仅是跟当前时间作比较,并跟新timeSinceLastSync。
当timeSinceLastSync >= fDeltaTimeRemaining时,判定为定时器timeout,并执行对应TaskFunc()。
总结
至此,MediaServer大的轮廓已经分析完毕,主要相关的动线有三条:
- Socket I/O
- Event
- DelayedTask
只要牢牢抓住这三条动线,结合相关入口、出口,就可以将MediaServer源码的血肉充实起来。本篇中如有疑虑或错误指出,还请大家指出。同样可以提出想要着重了解的内容,后续更新会选择性加入其中。
- 分享
- 举报
-
浏览量:2735次2020-07-29 15:54:29
-
浏览量:885次2023-10-09 16:20:04
-
浏览量:3489次2018-04-26 15:06:40
-
浏览量:759次2024-02-23 16:58:46
-
浏览量:4394次2020-08-11 10:39:44
-
2020-10-17 18:27:28
-
浏览量:797次2023-12-11 11:37:59
-
浏览量:3978次2020-04-26 17:55:16
-
浏览量:3631次2020-08-14 11:35:53
-
浏览量:920次2024-01-02 17:24:57
-
浏览量:569次2023-12-13 18:50:38
-
浏览量:3676次2023-12-02 14:17:30
-
浏览量:2619次2020-03-26 10:21:51
-
浏览量:20640次2020-12-07 16:37:34
-
浏览量:4310次2019-01-26 17:32:14
-
浏览量:1499次2019-09-06 17:19:50
-
浏览量:15817次2020-09-28 11:08:13
-
浏览量:4954次2020-12-20 16:34:53
-
浏览量:2396次2017-12-23 08:55:32
-
广告/SPAM
-
恶意灌水
-
违规内容
-
文不对题
-
重复发帖
老笨啊~
感谢您的打赏,如若您也想被打赏,可前往 发表专栏 哦~
举报类型
- 内容涉黄/赌/毒
- 内容侵权/抄袭
- 政治相关
- 涉嫌广告
- 侮辱谩骂
- 其他
详细说明