apns-php
 All Data Structures Files Functions Variables Groups Pages
Server.php
Go to the documentation of this file.
1 <?php
2 /**
3  * @file
4  * ApnsPHP_Push_Server class definition.
5  *
6  * LICENSE
7  *
8  * This source file is subject to the new BSD license that is bundled
9  * with this package in the file LICENSE.txt.
10  * It is also available through the world-wide-web at this URL:
11  * http://code.google.com/p/apns-php/wiki/License
12  * If you did not receive a copy of the license and are unable to
13  * obtain it through the world-wide-web, please send an email
14  * to aldo.armiento@gmail.com so we can send you a copy immediately.
15  *
16  * @author (C) 2010 Aldo Armiento (aldo.armiento@gmail.com)
17  * @version $Id$
18  */
19 
20 /**
21  * @defgroup ApnsPHP_Push_Server Server
22  * @ingroup ApnsPHP_Push
23  */
24 
25 /**
26  * The Push Notification Server Provider.
27  *
28  * The class manages multiple Push Notification Providers and an inter-process message
29  * queue. This class is useful to parallelize and speed-up send activities to Apple
30  * Push Notification service.
31  *
32  * @ingroup ApnsPHP_Push_Server
33  */
35 {
36  const MAIN_LOOP_USLEEP = 200000; /**< @type integer Main loop sleep time in micro seconds. */
37  const SHM_SIZE = 524288; /**< @type integer Shared memory size in bytes useful to store message queues. */
38  const SHM_MESSAGES_QUEUE_KEY_START = 1000; /**< @type integer Message queue start identifier for messages. For every process 1 is added to this number. */
39  const SHM_ERROR_MESSAGES_QUEUE_KEY = 999; /**< @type integer Message queue identifier for not delivered messages. */
40 
41  protected $_nProcesses = 3; /**< @type integer The number of processes to start. */
42  protected $_aPids = array(); /**< @type array Array of process PIDs. */
43  protected $_nParentPid; /**< @type integer The parent process id. */
44  protected $_nCurrentProcess; /**< @type integer Cardinal process number (0, 1, 2, ...). */
45  protected $_nRunningProcesses; /**< @type integer The number of running processes. */
46 
47  protected $_hShm; /**< @type resource Shared memory. */
48  protected $_hSem; /**< @type resource Semaphore. */
49 
50  /**
51  * Constructor.
52  *
53  * @param $nEnvironment @type integer Environment.
54  * @param $sProviderCertificateFile @type string Provider certificate file
55  * with key (Bundled PEM).
56  * @throws ApnsPHP_Push_Server_Exception if is unable to
57  * get Shared Memory Segment or Semaphore ID.
58  */
59  public function __construct($nEnvironment, $sProviderCertificateFile)
60  {
61  parent::__construct($nEnvironment, $sProviderCertificateFile);
62 
63  $this->_nParentPid = posix_getpid();
64  $this->_hShm = shm_attach(mt_rand(), self::SHM_SIZE);
65  if ($this->_hShm === false) {
67  'Unable to get shared memory segment'
68  );
69  }
70 
71  $this->_hSem = sem_get(mt_rand());
72  if ($this->_hSem === false) {
74  'Unable to get semaphore id'
75  );
76  }
77 
78  register_shutdown_function(array($this, 'onShutdown'));
79 
80  pcntl_signal(SIGCHLD, array($this, 'onChildExited'));
81  foreach(array(SIGTERM, SIGQUIT, SIGINT) as $nSignal) {
82  pcntl_signal($nSignal, array($this, 'onSignal'));
83  }
84  }
85 
86  /**
87  * Checks if the server is running and calls signal handlers for pending signals.
88  *
89  * Example:
90  * @code
91  * while ($Server->run()) {
92  * // do somethings...
93  * usleep(200000);
94  * }
95  * @endcode
96  *
97  * @return @type boolean True if the server is running.
98  */
99  public function run()
100  {
101  pcntl_signal_dispatch();
102  return $this->_nRunningProcesses > 0;
103  }
104 
105  /**
106  * Waits until a forked process has exited and decreases the current running
107  * process number.
108  */
109  public function onChildExited()
110  {
111  while (pcntl_waitpid(-1, $nStatus, WNOHANG) > 0) {
112  $this->_nRunningProcesses--;
113  }
114  }
115 
116  /**
117  * When a child (not the parent) receive a signal of type TERM, QUIT or INT
118  * exits from the current process and decreases the current running process number.
119  *
120  * @param $nSignal @type integer Signal number.
121  */
122  public function onSignal($nSignal)
123  {
124  switch ($nSignal) {
125  case SIGTERM:
126  case SIGQUIT:
127  case SIGINT:
128  if (($nPid = posix_getpid()) != $this->_nParentPid) {
129  $this->_log("INFO: Child $nPid received signal #{$nSignal}, shutdown...");
130  $this->_nRunningProcesses--;
131  exit(0);
132  }
133  break;
134  default:
135  $this->_log("INFO: Ignored signal #{$nSignal}.");
136  break;
137  }
138  }
139 
140  /**
141  * When the parent process exits, cleans shared memory and semaphore.
142  *
143  * This is called using 'register_shutdown_function' pattern.
144  * @see http://php.net/register_shutdown_function
145  */
146  public function onShutdown()
147  {
148  if (posix_getpid() == $this->_nParentPid) {
149  $this->_log('INFO: Parent shutdown, cleaning memory...');
150  @shm_remove($this->_hShm) && @shm_detach($this->_hShm);
151  @sem_remove($this->_hSem);
152  }
153  }
154 
155  /**
156  * Set the total processes to start, default is 3.
157  *
158  * @param $nProcesses @type integer Processes to start up.
159  */
160  public function setProcesses($nProcesses)
161  {
162  $nProcesses = (int)$nProcesses;
163  if ($nProcesses <= 0) {
164  return;
165  }
166  $this->_nProcesses = $nProcesses;
167  }
168 
169  /**
170  * Starts the server forking all processes and return immediately.
171  *
172  * Every forked process is connected to Apple Push Notification Service on start
173  * and enter on the main loop.
174  */
175  public function start()
176  {
177  for ($i = 0; $i < $this->_nProcesses; $i++) {
178  $this->_nCurrentProcess = $i;
179  $this->_aPids[$i] = $nPid = pcntl_fork();
180  if ($nPid == -1) {
181  $this->_log('WARNING: Could not fork');
182  } else if ($nPid > 0) {
183  // Parent process
184  $this->_log("INFO: Forked process PID {$nPid}");
185  $this->_nRunningProcesses++;
186  } else {
187  // Child process
188  try {
189  parent::connect();
190  } catch (ApnsPHP_Exception $e) {
191  $this->_log('ERROR: ' . $e->getMessage() . ', exiting...');
192  exit(1);
193  }
194  $this->_mainLoop();
195  parent::disconnect();
196  exit(0);
197  }
198  }
199  }
200 
201  /**
202  * Adds a message to the inter-process message queue.
203  *
204  * Messages are added to the queues in a round-robin fashion starting from the
205  * first process to the last.
206  *
207  * @param $message @type ApnsPHP_Message The message.
208  */
209  public function add(ApnsPHP_Message $message)
210  {
211  static $n = 0;
212  if ($n >= $this->_nProcesses) {
213  $n = 0;
214  }
215  sem_acquire($this->_hSem);
216  $aQueue = $this->_getQueue(self::SHM_MESSAGES_QUEUE_KEY_START, $n);
217  $aQueue[] = $message;
218  $this->_setQueue(self::SHM_MESSAGES_QUEUE_KEY_START, $n, $aQueue);
219  sem_release($this->_hSem);
220  $n++;
221  }
222 
223  /**
224  * Returns messages in the message queue.
225  *
226  * When a message is successful sent or reached the maximum retry time is removed
227  * from the message queue and inserted in the Errors container. Use the getErrors()
228  * method to retrive messages with delivery error(s).
229  *
230  * @param $bEmpty @type boolean @optional Empty message queue.
231  * @return @type array Array of messages left on the queue.
232  */
233  public function getQueue($bEmpty = true)
234  {
235  $aRet = array();
236  sem_acquire($this->_hSem);
237  for ($i = 0; $i < $this->_nProcesses; $i++) {
238  $aRet = array_merge($aRet, $this->_getQueue(self::SHM_MESSAGES_QUEUE_KEY_START, $i));
239  if ($bEmpty) {
240  $this->_setQueue(self::SHM_MESSAGES_QUEUE_KEY_START, $i);
241  }
242  }
243  sem_release($this->_hSem);
244  return $aRet;
245  }
246 
247  /**
248  * Returns messages not delivered to the end user because one (or more) error
249  * occurred.
250  *
251  * @param $bEmpty @type boolean @optional Empty message container.
252  * @return @type array Array of messages not delivered because one or more errors
253  * occurred.
254  */
255  public function getErrors($bEmpty = true)
256  {
257  sem_acquire($this->_hSem);
258  $aRet = $this->_getQueue(self::SHM_ERROR_MESSAGES_QUEUE_KEY);
259  if ($bEmpty) {
260  $this->_setQueue(self::SHM_ERROR_MESSAGES_QUEUE_KEY, 0, array());
261  }
262  sem_release($this->_hSem);
263  return $aRet;
264  }
265 
266  /**
267  * The process main loop.
268  *
269  * During the main loop: the per-process error queue is read and the common error message
270  * container is populated; the per-process message queue is spooled (message from
271  * this queue is added to ApnsPHP_Push queue and delivered).
272  */
273  protected function _mainLoop()
274  {
275  while (true) {
276  pcntl_signal_dispatch();
277 
278  if (posix_getppid() != $this->_nParentPid) {
279  $this->_log("INFO: Parent process {$this->_nParentPid} died unexpectedly, exiting...");
280  break;
281  }
282 
283  sem_acquire($this->_hSem);
284  $this->_setQueue(self::SHM_ERROR_MESSAGES_QUEUE_KEY, 0,
285  array_merge($this->_getQueue(self::SHM_ERROR_MESSAGES_QUEUE_KEY), parent::getErrors())
286  );
287 
288  $aQueue = $this->_getQueue(self::SHM_MESSAGES_QUEUE_KEY_START, $this->_nCurrentProcess);
289  foreach($aQueue as $message) {
290  parent::add($message);
291  }
292  $this->_setQueue(self::SHM_MESSAGES_QUEUE_KEY_START, $this->_nCurrentProcess);
293  sem_release($this->_hSem);
294 
295  $nMessages = count($aQueue);
296  if ($nMessages > 0) {
297  $this->_log('INFO: Process ' . ($this->_nCurrentProcess + 1) . " has {$nMessages} messages, sending...");
298  parent::send();
299  } else {
300  usleep(self::MAIN_LOOP_USLEEP);
301  }
302  }
303  }
304 
305  /**
306  * Returns the queue from the shared memory.
307  *
308  * @param $nQueueKey @type integer The key of the queue stored in the shared
309  * memory.
310  * @param $nProcess @type integer @optional The process cardinal number.
311  * @return @type array Array of messages from the queue.
312  */
313  protected function _getQueue($nQueueKey, $nProcess = 0)
314  {
315  if (!shm_has_var($this->_hShm, $nQueueKey + $nProcess)) {
316  return array();
317  }
318  return shm_get_var($this->_hShm, $nQueueKey + $nProcess);
319  }
320 
321  /**
322  * Store the queue into the shared memory.
323  *
324  * @param $nQueueKey @type integer The key of the queue to store in the shared
325  * memory.
326  * @param $nProcess @type integer @optional The process cardinal number.
327  * @param $aQueue @type array @optional The queue to store into shared memory.
328  * The default value is an empty array, useful to empty the queue.
329  * @return @type boolean True on success, false otherwise.
330  */
331  protected function _setQueue($nQueueKey, $nProcess = 0, $aQueue = array())
332  {
333  if (!is_array($aQueue)) {
334  $aQueue = array();
335  }
336  return shm_put_var($this->_hShm, $nQueueKey + $nProcess, $aQueue);
337  }
338 }
void onSignal(integer $nSignal)
When a child (not the parent) receive a signal of type TERM, QUIT or INT exits from the current proce...
Definition: Server.php:122
integer $_nProcesses
The number of processes to start.
Definition: Server.php:41
array _getQueue(integer $nQueueKey, integer $nProcess=0)
Returns the queue from the shared memory.
Definition: Server.php:313
integer $_nParentPid
The parent process id.
Definition: Server.php:43
resource $_hShm
Shared memory.
Definition: Server.php:47
const SHM_MESSAGES_QUEUE_KEY_START
integer Message queue start identifier for messages.
Definition: Server.php:38
array getQueue(boolean $bEmpty=true)
Returns messages in the message queue.
Definition: Server.php:233
void start()
Starts the server forking all processes and return immediately.
Definition: Server.php:175
array $_aPids
Array of process PIDs.
Definition: Server.php:42
void __construct(integer $nEnvironment, string $sProviderCertificateFile)
Constructor.
Definition: Server.php:59
Exception class.
Definition: Exception.php:25
const SHM_ERROR_MESSAGES_QUEUE_KEY
integer Message queue identifier for not delivered messages.
Definition: Server.php:39
void add(ApnsPHP_Message $message)
Adds a message to the inter-process message queue.
Definition: Server.php:209
void setProcesses(integer $nProcesses)
Set the total processes to start, default is 3.
Definition: Server.php:160
The Push Notification Server Provider.
Definition: Server.php:34
const SHM_SIZE
integer Shared memory size in bytes useful to store message queues.
Definition: Server.php:37
void _log(string $sMessage)
Logs a message through the Logger.
Definition: Abstract.php:414
boolean run()
Checks if the server is running and calls signal handlers for pending signals.
Definition: Server.php:99
void _mainLoop()
The process main loop.
Definition: Server.php:273
The Push Notification Provider.
Definition: Push.php:33
const MAIN_LOOP_USLEEP
integer Main loop sleep time in micro seconds.
Definition: Server.php:36
void onChildExited()
Waits until a forked process has exited and decreases the current running process number...
Definition: Server.php:109
void onShutdown()
When the parent process exits, cleans shared memory and semaphore.
Definition: Server.php:146
resource $_hSem
Semaphore.
Definition: Server.php:48
integer $_nCurrentProcess
Cardinal process number (0, 1, 2, ...).
Definition: Server.php:44
array getErrors(boolean $bEmpty=true)
Returns messages not delivered to the end user because one (or more) error occurred.
Definition: Server.php:255
boolean _setQueue(integer $nQueueKey, integer $nProcess=0, array $aQueue=array())
Store the queue into the shared memory.
Definition: Server.php:331
The Push Notification Message.
Definition: Message.php:34
integer $_nRunningProcesses
The number of running processes.
Definition: Server.php:45