59 public function __construct($nEnvironment, $sProviderCertificateFile)
61 parent::__construct($nEnvironment, $sProviderCertificateFile);
63 $this->_nParentPid = posix_getpid();
64 $this->_hShm = shm_attach(mt_rand(), self::SHM_SIZE);
65 if ($this->_hShm ===
false) {
67 'Unable to get shared memory segment'
71 $this->_hSem = sem_get(mt_rand());
72 if ($this->_hSem ===
false) {
74 'Unable to get semaphore id'
78 register_shutdown_function(array($this,
'onShutdown'));
80 pcntl_signal(SIGCHLD, array($this,
'onChildExited'));
81 foreach(array(SIGTERM, SIGQUIT, SIGINT) as $nSignal) {
82 pcntl_signal($nSignal, array($this,
'onSignal'));
101 pcntl_signal_dispatch();
102 return $this->_nRunningProcesses > 0;
111 while (pcntl_waitpid(-1, $nStatus, WNOHANG) > 0) {
112 $this->_nRunningProcesses--;
128 if (($nPid = posix_getpid()) != $this->_nParentPid) {
129 $this->
_log(
"INFO: Child $nPid received signal #{$nSignal}, shutdown...");
130 $this->_nRunningProcesses--;
135 $this->
_log(
"INFO: Ignored signal #{$nSignal}.");
148 if (posix_getpid() == $this->_nParentPid) {
149 $this->
_log(
'INFO: Parent shutdown, cleaning memory...');
150 @shm_remove($this->_hShm) && @shm_detach($this->_hShm);
151 @sem_remove($this->_hSem);
162 $nProcesses = (int)$nProcesses;
163 if ($nProcesses <= 0) {
166 $this->_nProcesses = $nProcesses;
178 $this->_nCurrentProcess = $i;
179 $this->_aPids[$i] = $nPid = pcntl_fork();
181 $this->
_log(
'WARNING: Could not fork');
182 }
else if ($nPid > 0) {
184 $this->
_log(
"INFO: Forked process PID {$nPid}");
185 $this->_nRunningProcesses++;
191 $this->
_log(
'ERROR: ' . $e->getMessage() .
', exiting...');
195 parent::disconnect();
212 if ($n >= $this->_nProcesses) {
215 sem_acquire($this->_hSem);
216 $aQueue = $this->
_getQueue(self::SHM_MESSAGES_QUEUE_KEY_START, $n);
217 $aQueue[] = $message;
218 $this->
_setQueue(self::SHM_MESSAGES_QUEUE_KEY_START, $n, $aQueue);
219 sem_release($this->_hSem);
236 sem_acquire($this->_hSem);
238 $aRet = array_merge($aRet, $this->
_getQueue(self::SHM_MESSAGES_QUEUE_KEY_START, $i));
240 $this->
_setQueue(self::SHM_MESSAGES_QUEUE_KEY_START, $i);
243 sem_release($this->_hSem);
257 sem_acquire($this->_hSem);
258 $aRet = $this->
_getQueue(self::SHM_ERROR_MESSAGES_QUEUE_KEY);
260 $this->
_setQueue(self::SHM_ERROR_MESSAGES_QUEUE_KEY, 0, array());
262 sem_release($this->_hSem);
276 pcntl_signal_dispatch();
278 if (posix_getppid() != $this->_nParentPid) {
279 $this->
_log(
"INFO: Parent process {$this->_nParentPid} died unexpectedly, exiting...");
283 sem_acquire($this->_hSem);
284 $this->
_setQueue(self::SHM_ERROR_MESSAGES_QUEUE_KEY, 0,
285 array_merge($this->
_getQueue(self::SHM_ERROR_MESSAGES_QUEUE_KEY), parent::getErrors())
288 $aQueue = $this->
_getQueue(self::SHM_MESSAGES_QUEUE_KEY_START, $this->_nCurrentProcess);
289 foreach($aQueue as $message) {
290 parent::add($message);
292 $this->
_setQueue(self::SHM_MESSAGES_QUEUE_KEY_START, $this->_nCurrentProcess);
293 sem_release($this->_hSem);
295 $nMessages = count($aQueue);
296 if ($nMessages > 0) {
297 $this->
_log(
'INFO: Process ' . ($this->_nCurrentProcess + 1) .
" has {$nMessages} messages, sending...");
300 usleep(self::MAIN_LOOP_USLEEP);
315 if (!shm_has_var($this->_hShm, $nQueueKey + $nProcess)) {
318 return shm_get_var($this->_hShm, $nQueueKey + $nProcess);
331 protected function _setQueue($nQueueKey, $nProcess = 0, $aQueue = array())
333 if (!is_array($aQueue)) {
336 return shm_put_var($this->_hShm, $nQueueKey + $nProcess, $aQueue);
void onSignal(integer $nSignal)
When a child (not the parent) receive a signal of type TERM, QUIT or INT exits from the current proce...
integer $_nProcesses
The number of processes to start.
array _getQueue(integer $nQueueKey, integer $nProcess=0)
Returns the queue from the shared memory.
integer $_nParentPid
The parent process id.
resource $_hShm
Shared memory.
const SHM_MESSAGES_QUEUE_KEY_START
integer Message queue start identifier for messages.
array getQueue(boolean $bEmpty=true)
Returns messages in the message queue.
void start()
Starts the server forking all processes and return immediately.
array $_aPids
Array of process PIDs.
void __construct(integer $nEnvironment, string $sProviderCertificateFile)
Constructor.
const SHM_ERROR_MESSAGES_QUEUE_KEY
integer Message queue identifier for not delivered messages.
void add(ApnsPHP_Message $message)
Adds a message to the inter-process message queue.
void setProcesses(integer $nProcesses)
Set the total processes to start, default is 3.
The Push Notification Server Provider.
const SHM_SIZE
integer Shared memory size in bytes useful to store message queues.
void _log(string $sMessage)
Logs a message through the Logger.
boolean run()
Checks if the server is running and calls signal handlers for pending signals.
void _mainLoop()
The process main loop.
The Push Notification Provider.
const MAIN_LOOP_USLEEP
integer Main loop sleep time in micro seconds.
void onChildExited()
Waits until a forked process has exited and decreases the current running process number...
void onShutdown()
When the parent process exits, cleans shared memory and semaphore.
resource $_hSem
Semaphore.
integer $_nCurrentProcess
Cardinal process number (0, 1, 2, ...).
array getErrors(boolean $bEmpty=true)
Returns messages not delivered to the end user because one (or more) error occurred.
boolean _setQueue(integer $nQueueKey, integer $nProcess=0, array $aQueue=array())
Store the queue into the shared memory.
The Push Notification Message.
integer $_nRunningProcesses
The number of running processes.