- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;2 P& |4 q9 w# A, W" l
- require_once __DIR__ . '/Workerman/Autoloader.php';- J1 O4 \/ h( j0 r' i: H! n, S
- . e, ~) F+ h. a& P K9 t
- $worker = new Worker();. I, z4 J7 v! T5 t& n2 q
- // 4个进程
; ]' Q6 A, l2 [: v. ^ - $worker->count = 4;7 H: c3 |, W( P9 Q8 l
- // 每个进程启动后在当前进程新增一个Worker监听+ G9 J9 B3 F% W! b9 y
- $worker->onWorkerStart = function($worker)$ N6 x& s& F# A; J0 [5 j; Q% @9 [) E! |
- {; q A7 c, P# G- `
- /**% r( @: L( V$ c2 S- Z
- * 4个进程启动的时候都创建2016端口的Worker: L' f) r! P. g7 U& m) _
- * 当执行到worker->listen()时会报Address already in use错误" V& S/ T1 U i- Y6 N5 D* Z" W1 u: q
- * 如果worker->count=1则不会报错% K3 I1 B ^$ R/ u
- */
" u' i2 w- u3 A+ r& w, o8 | - $inner_worker = new Worker('http://0.0.0.0:2016');' T+ H7 B+ r7 k/ i. |! l
- $inner_worker->onMessage = 'on_message';
% N* ^+ R8 M! D+ p" m - // 执行监听。这里会报Address already in use错误7 |7 D/ B& E- O, X# O+ \
- $inner_worker->listen();
! R% ?( l& w+ A! ~" ^7 V - };
" Y6 V* I$ c+ X- K7 H
: O' Z& @; O$ T- $worker->onMessage = 'on_message';4 B& s+ B, O& V1 {+ n
- + q; s+ K+ Y5 s( J9 U
- function on_message($connection, $data)
$ w& C3 u! p) C- T# | - {
! p6 ^5 }+ l4 U' f8 a7 g3 R - $connection->send("hello\n");- `; D$ O8 E3 f4 r* m
- }& V& R3 A& ?5 _0 D# e
) X- V7 B" o7 Y {- // 运行worker
# |7 g& P3 X* L) D - Worker::runAll();
: b8 Y: S) B T6 ]! j& W - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
0 Q3 \3 S5 _4 |8 t% D8 ` - 0 b+ p% o. W" w' {! y( H
- use Workerman\Worker;& x/ v! o. I. N- z* z- t q
- require_once './Workerman/Autoloader.php';- U$ ~8 _% V; S; {* @4 Q
- 8 S0 t9 ~1 M) z6 i: D9 C
- $worker = new Worker('text://0.0.0.0:2015');* h0 j' _: U' ^4 p V' w
- // 4个进程
& [8 M8 o5 ?: r4 X; O" \+ y - $worker->count = 4;' x4 e+ U; ^" t) z4 A
- // 每个进程启动后在当前进程新增一个Worker监听" `+ x/ ^2 V4 o N
- $worker->onWorkerStart = function($worker)( {$ g6 l5 v6 R% E( ~ |
- {
0 h* F( x5 H: j - $inner_worker = new Worker('http://0.0.0.0:2016');
' u0 c2 f; z0 w4 K# J - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)2 W' L, V+ S& ?" c
- $inner_worker->reusePort = true;
9 Q& V! G, Y: i1 ]) M. Y - $inner_worker->onMessage = 'on_message';
( _" x/ e# {' g$ N) ]. U( b" {0 _ - // 执行监听。正常监听不会报错
7 {4 B4 E# A2 @& R2 i* [' G - $inner_worker->listen();' P; h/ \2 G+ h% h' d
- };5 ^8 Q+ }' G& h+ y; M
- 4 c; z: {3 \# u! R/ g( |% }7 {% S
- $worker->onMessage = 'on_message';
! |# ?9 l* [0 h! b, Q, \
# E% j- b0 L. j# Z- q- function on_message($connection, $data)' ^5 W/ e& P- }) X. c# F
- {3 C& B1 ~1 U; F6 R' f0 K
- $connection->send("hello\n");
, L+ n# I# b2 Q - }
. q3 n3 i" F ^( Z9 c) V - [ `! w$ o+ S# a8 J8 A% o
- // 运行worker
8 x' r7 x. {/ W - Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php
& n2 U$ _6 ~6 E( B% ]% P, ]7 a - use Workerman\Worker;0 e5 h. T# _2 ^, h
- require_once './Workerman/Autoloader.php';8 X! \! ]8 Q! p
- // 初始化一个worker容器,监听1234端口
6 w+ U7 H; v5 h' j; L - $worker = new Worker('websocket://0.0.0.0:1234');
' t; `: X! n' ~7 |5 S3 U0 T - 6 j: n" ^: u( j; }8 F( S/ O8 O
- /** C; _, `1 m% P/ F; v
- * 注意这里进程数必须设置为1,否则会报端口占用错误
+ \0 i) [- _" R# r0 G - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
/ [3 n w5 D h% @: \ - */% [. m4 k' s. b1 g8 K4 T4 V# e
- $worker->count = 1;
" D, @( @2 V, W) c! n - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口# K) o2 J% ]4 v( M2 f
- $worker->onWorkerStart = function($worker)
3 T9 U T, l1 l7 b% A- i; ? V# e* S - {
9 [1 O) W' X9 T. _# }8 [# f+ v - // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
. \0 C1 t0 @$ |3 }! r0 M - $inner_text_worker = new Worker('text://0.0.0.0:5678');
$ g* d' t0 O- k8 f/ U1 W _4 J. o# c - $inner_text_worker->onMessage = function($connection, $buffer)( g* X9 A8 _$ G
- {
* |* b, O S Z b: M: A - // $data数组格式,里面有uid,表示向那个uid的页面推送数据
" c" p; d; p" C8 y - $data = json_decode($buffer, true);! N3 S" D( g4 v
- $uid = $data['uid'];
% \' F0 o, |3 H - // 通过workerman,向uid的页面推送数据
' P; e4 o/ b: u - $ret = sendMessageByUid($uid, $buffer);
1 a( e' Z6 E3 @' N5 [ Q( Z - // 返回推送结果+ `1 T' G( L$ Q& m" X1 X
- $connection->send($ret ? 'ok' : 'fail');
' h7 a: v9 j" m. l: v1 ]1 Q - };' U" q( }& P2 j( T4 u
- // ## 执行监听 ##
3 V. v. y7 W0 d9 K# ~ - $inner_text_worker->listen();
0 u J# |2 f5 O) Z - };5 v8 G8 B. Y2 ^8 G( s/ p
- // 新增加一个属性,用来保存uid到connection的映射
+ o1 q" r4 ] h/ U" K - $worker->uidConnections = array();
( Q! A3 X9 s% K9 s# {( _0 m9 d+ i2 { - // 当有客户端发来消息时执行的回调函数
! n. Y& I2 q4 m- i: W7 W6 h - $worker->onMessage = function($connection, $data)" J) p& B6 l4 @- A
- {
$ v( G8 m E% a- I; p! y1 _1 n( C1 p - global $worker;
( g0 l7 q6 j4 g - // 判断当前客户端是否已经验证,既是否设置了uid) H: V8 t% j1 C: E( q
- if(!isset($connection->uid))
$ q3 D2 Q8 `* K' n% R! L9 N( l - {# J8 }8 O% ~. C; c( @/ r0 @; r0 G
- // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
5 Q! K; W7 Q8 p" y4 H- U - $connection->uid = $data;. j8 D( w' w2 _
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,7 B, d9 M9 Y8 n! l
- * 实现针对特定uid推送数据- R( d5 w' ?: V- I! {+ t
- */
2 c, j/ D2 X9 u8 [ - $worker->uidConnections[$connection->uid] = $connection;( g5 f+ x" z# S) f% l! N8 k2 \' V
- return;
$ k( p# H: }: d/ K+ O/ D! h - }
& j/ r+ X z" Z/ r - };
% S! p% I* b7 f! h: l) }7 r
6 @( a7 X) e% r$ H9 m- // 当有客户端连接断开时; t W. n1 [( a1 e) I$ i
- $worker->onClose = function($connection)3 q2 U" d" B0 s' ~
- {
. d: ?" Y; Q* |0 Q! H( q - global $worker;
# o* @+ k1 |" z5 u* Z8 G% _ - if(isset($connection->uid))9 J- R5 ^' Q ^* E$ h
- {
- @* }1 v2 Q2 o' {1 G: w7 L - // 连接断开时删除映射
! d1 e' [& i0 W% F. s, s - unset($worker->uidConnections[$connection->uid]);* D/ Z6 D' f7 ?4 [2 U p" |" _
- }0 L0 U* L- u6 C9 J
- };
; y0 q3 D' { M# M# H: m/ q - 5 P* k/ ^0 q! `+ P2 j
- // 向所有验证的用户推送数据9 e( ]4 W% ?( ~
- function broadcast($message)! C- x& D* b7 I0 W9 v% i4 {
- {
8 c+ S! K V0 Z- N1 R' d# t - global $worker;; w2 r$ _; F( u9 P; e
- foreach($worker->uidConnections as $connection)
( E! l" r% I4 z5 I- \$ N9 K, J Y - {
/ H- T( l7 l) _7 V5 R( F% q& ~ - $connection->send($message);7 `$ h% w4 _* m/ k+ ]: J
- }
+ `: `" u7 c/ T0 T6 z - }' R6 z9 s) F9 E& c7 I6 o
& S4 i- S7 V5 t! w+ i/ I1 G4 R- // 针对uid推送数据
# V' g& ?/ ?0 l6 H( Z9 J% q. J - function sendMessageByUid($uid, $message)
% W/ P. `+ F W3 U - {9 ]+ K" Z5 w7 m; e: W- t; c5 {1 W$ W
- global $worker;' g, |: U$ }5 {* d# h, s$ g( J
- if(isset($worker->uidConnections[$uid]))- [: W# P. l4 r1 G
- {* V! E9 X; `( r6 ]% U( y
- $connection = $worker->uidConnections[$uid];
2 a$ R( {9 h' X* ^; v3 v; U - $connection->send($message);' M( l. O9 K8 b
- return true;
& R( B6 O3 c: A- x: q" e - }
& `1 N: S* y5 X0 E! ]* T - return false;
* }, |, v" J) ^ - }- K# ]+ Z2 |) y/ p: @& y
- ' M9 ?7 k* k+ F$ w
- // 运行所有的worker, T4 z! l* E: v. n3 M v4 x
- Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');1 ^+ p6 }1 P3 H2 w/ I: e6 e
- ws.onopen = function(){- ]" V! w" k6 `# F4 P- m+ ^8 x$ G: w
- var uid = 'uid1';( J$ p* p ^5 Y9 I( K" a& {9 w
- ws.send(uid);
, a# U. w( \/ H% F# d: O - };
1 U4 X( m4 Z* P/ I' l6 D - ws.onmessage = function(e){& O6 o7 C7 H; a$ P% F) {. |1 O+ t
- alert(e.data);7 D; W) i7 ?/ {" M ?2 f
- };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
$ p9 s$ R2 b* ` M! U9 [ - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
$ I7 S+ `, m/ `3 V7 {7 h: D; H - // 推送的数据,包含uid字段,表示是给这个uid推送% {3 o) L; s( e, p
- $data = array('uid'=>'uid1', 'percent'=>'88%');3 I7 ^, h6 h' d) z2 ?) P' _. j( E
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符5 L0 q6 k1 f5 Q- p
- fwrite($client, json_encode($data)."\n");
0 j( M( R4 y+ S - // 读取推送结果$ {$ i+ q6 }4 j7 i( j% P6 T" j7 a
- echo fread($client, 8192);
复制代码
9 P3 Z( J5 z4 @( Z: F3 f, ]$ ^* ]9 n u' V
|