48 #include <arpa/inet.h>
54 char *&recv_addr,
char *&recv_port,
55 char *&src_addr,
char *&src_port,
62 interpreter.
init(argc, argv);
69 interpreter.
reinterpret(
"-num_threads", num_threads);
70 interpreter.
reinterpret(
"-num_packets", num_inflight_packets);
71 interpreter.
reinterpret(
"-recv_buf_size", recvfrm_buf_size);
77 printf(
"The following arguments were not interpreted:\n");
81 printf(
"%s\n", t.
arg);
87 if (recv_addr == NULL)
89 printf(
"Please use \"-addr\" to provide a receiving address, "
90 "\"localhost\" or a local network card IPv4 address.\n");
93 if (recv_port == NULL)
95 printf(
"Please use \"-port\" to provide a port number.\n");
100 printf(
"Please set \"-num_threads\" to 1 or more.\n");
103 if (num_inflight_packets < 1)
105 printf(
"Please set \"-num_packets\" to 1 or more.\n");
113int main(
int argc,
char* argv[])
115 char *recv_addr = NULL;
116 char *recv_port = NULL;
117 char *src_addr = NULL;
118 char *src_port = NULL;
119 char *target_name = NULL;
123 bool blocking =
false;
129 "The following arguments are necessary:\n"
130 " -addr <receiving IPv4 address>, or\n"
131 " The address should be either localhost, or\n"
132 " a local network card IPv4 address\n"
133 " example: -addr 127.0.0.1\n"
134 " -port <listening port>\n"
136 "The following arguments are options:\n"
137 " -src_addr <source ipv4 address>, packets from other sources\n"
138 " will be ignored. If not specified, then packets\n"
139 " from any source are accepted.\n"
140 " -src_port <source port>, packets from other source ports are\n"
141 " ignored. If not specified, then packets from any\n"
142 " port are accepted -- I would recommend not leaving\n"
144 " -recv_buf_size <integer> recvfrom buffer size; default is 65536.\n"
145 " This is the size of the operating system's receive\n"
146 " buffer, before packets are picked by the program.\n"
147 " Larger buffers reduces the likelihood that a packet\n"
148 " is dropped before the program has a chance to pick it.\n"
149 " -blocking sets the receiving socket blocking mode to blocking.\n"
150 " The default mode is non-blocking. A blocking socket\n"
151 " increases the likelihood of not receiving some\n"
152 " packets; this is because the thread get into sleep\n"
153 " state, and therefore takes sometime to wakeup. A\n"
154 " non-blocking socket increase power consumption,\n"
155 " because it prevents the thread from sleeping.\n"
156 " -num_threads <integer> number of threads for decoding and\n"
157 " displaying files. This number also determines the\n"
158 " number of in-flight files, not completely\n"
159 " saved/processed yet. The number of files is set to\n"
160 " number of threads + 1\n"
161 " -num_packets <integer> number of in-flight packets; this is a\n"
162 " window of packets in which packets can be re-ordered.\n"
163 " -o <string> target file name without extension; the same\n"
164 " printf formating can be used. For example,\n"
165 " output_%%05d. An extension will be added, either .j2c\n"
166 " for original frames, or .ppm for decoded images.\n"
167 " -quiet use to stop printing informative messages.\n."
172 if (!
get_arguments(argc, argv, recv_addr, recv_port, src_addr, src_port,
173 target_name, num_threads, num_inflight_packets,
174 recvfrm_buf_size, blocking, quiet))
181 thread_pool.
init(num_threads);
183 frames_handler.
init(quiet, target_name, &thread_pool);
185 packets_handler.
init(quiet, num_inflight_packets, &frames_handler);
189 struct sockaddr_in server;
191 server.sin_family = AF_INET;
192 const char *p = recv_addr;
193 const char localhost[] =
"127.0.0.1";
194 if (strcmp(recv_addr,
"localhost") == 0)
196 int result = inet_pton(AF_INET, p, &server.sin_addr);
198 OJPH_ERROR(0x02000001,
"Please provide a valid IPv4 address when "
199 "using \"-addr,\" the provided address %s is not valid",
203 if (port_number == 0)
204 OJPH_ERROR(0x02000002,
"Please provide a valid port number. "
205 "The number you provided is %d", recv_port);
206 server.sin_port = htons(port_number);
211 s = smanager.
create_socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
215 OJPH_ERROR(0x02000003,
"Could not create socket: %s", err.data());
219 if (::setsockopt(s.
intern(), SOL_SOCKET, SO_RCVBUF,
220 (
char*)&recvfrm_buf_size,
sizeof(recvfrm_buf_size)) == -1)
224 "Failed to expand receive buffer: %s", err.data());
232 "Failed to set the socket's blocking mode to %s, with error %s",
233 blocking ?
"blocking" :
"non-blocking", err.data());
237 if(bind(s.
intern(), (
struct sockaddr *)&server,
sizeof(server)) == -1)
241 "Could not bind address to socket: %s", err.data());
245 constexpr int buf_size = 128;
248 const char* t = inet_ntop(AF_INET, &addr, buf, buf_size);
252 "Error converting source address: %s", err.data());
254 printf(
"Listining on %s, port %d\n", t, ntohs(server.sin_port));
261 const char *p = src_addr;
262 const char localhost[] =
"127.0.0.1";
263 if (strcmp(src_addr,
"localhost") == 0)
265 struct sockaddr_in t;
266 int result = inet_pton(AF_INET, p, &t.sin_addr);
268 OJPH_ERROR(0x02000005,
"Please provide a valid IPv4 address when "
269 "using \"-src_addr,\" the provided address %s is not valid",
278 OJPH_ERROR(0x02000006,
"Please provide a valid port number. "
279 "The number you provided is %d", src_port);
283 struct sockaddr_in si_other;
284 socklen_t socklen =
sizeof(si_other);
285 bool src_printed =
false;
290 if (packet == NULL || packet->
num_bytes != 0)
291 packet = packets_handler.
exchange(packet);
297 int num_bytes = (int)recvfrom(s.
intern(), (
char*)packet->
data,
298 packet->
max_size, 0, (
struct sockaddr*)&si_other, &socklen);
306 OJPH_INFO(0x02000003,
"Failed to receive data: %s", err.data());
311 if ((src_addr && saddr != smanager.
get_addr(si_other)) ||
312 (src_port && sport != si_other.sin_port)) {
313 constexpr int buf_size = 128;
316 const char* t = inet_ntop(AF_INET, &addr, buf, buf_size);
320 "Error converting source address: %s", err.data());
322 printf(
"Source mistmatch %s, port %d\n",
323 t, ntohs(si_other.sin_port));
329 if (last_time_stamp == 0)
332 if (!quiet && !src_printed)
334 constexpr int buf_size = 128;
337 const char* t = inet_ntop(AF_INET, &addr, buf, buf_size);
341 "Error converting source address: %s", err.data());
343 printf(
"Receiving data from %s, port %d\n",
344 t, ntohs(si_other.sin_port));
353 ojph::ui32 total_frames = 0, trunc_frames = 0, lost_frames = 0;
354 frames_handler.
get_stats(total_frames, trunc_frames, lost_frames);
356 printf(
"Total frame %d, tuncated frames %d, lost frames %d, "
358 total_frames, trunc_frames, lost_frames, lost_packets);
363 catch (
const std::exception& e)
365 const char *p = e.what();
366 if (strncmp(p,
"ojph error", 10) != 0)
void init(int argc, char *argv[])
void reinterpret(const char *str, int &val)
argument get_argument_zero()
argument get_next_avail_argument(const argument &arg)
A small wrapper for some Winsock2 functionality.
std::string get_error_message(int errnum)
Abstructs obtaining a textual message for an errnum.
static ui32 get_addr(const sockaddr_in &addr)
Abstractly obtains the 32-bit IPv4 address integer.
socket create_socket(int domain, int type, int protocol)
Abstructs socket creation.
std::string get_last_error_message()
Abstructs obtaining a textual message for GetLastError/errno.
int get_last_error()
Abstructs get last error or errno.
A small wrapper for socket that only abstract Winsock2.
ojph_socket intern()
provides access to the internal socket handle
void close()
Abstracts socket closing function.
bool set_blocking_mode(bool block)
Sets the blocking mode.
Assumes packets arrive in order.
void get_stats(ui32 &total_frames, ui32 &trunc_frames, ui32 &lost_frames)
call this function to collect statistics about frames
void init(bool quiet, const char *target_name, thds::thread_pool *thread_pool)
call this function to initialize this object
Interprets new packets, buffers them if needed.
rtp_packet * exchange(rtp_packet *p)
Call this function to get a packet from the packet chain.
ui32 get_num_lost_packets() const
This function provides information about the observed number of lost packets.
void init(bool quiet, ui32 num_packets, frames_handler *frames)
call this to initialize packets_handler
Implements a pool of threads, and can queue tasks.
void init(size_t num_threads)
Initializes the thread pool.
#define OJPH_INFO(t,...)
MACROs to insert file and line number for info, warning, and error.
#define OJPH_ERROR(t,...)
#define OJPH_INVALID_SOCKET
int main(int argc, char *argv[])
static bool get_arguments(int argc, char *argv[], char *&recv_addr, char *&recv_port, char *&src_addr, char *&src_port, char *&target_name, ojph::ui32 &num_threads, ojph::ui32 &num_inflight_packets, ojph::ui32 &recvfrm_buf_size, bool &blocking, bool &quiet)
inteprets RTP header and payload, and holds received packets.
ui32 num_bytes
number of bytes
ui8 data[max_size]
data in the packet
static constexpr int max_size
maximum packet size