- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;1 G. Q% p2 L3 W& A' [
- require_once __DIR__ . '/Workerman/Autoloader.php';
# G6 @ k# m2 D' J* t - / Y) k( g9 E9 A1 s( y8 Y
- $worker = new Worker();) M4 B2 p( h* H: \% }
- // 4个进程
; N. u! y% v* J4 G3 d - $worker->count = 4;! x6 e) @$ ]! _% ?/ L6 u) g
- // 每个进程启动后在当前进程新增一个Worker监听9 t0 d$ ^! o' x: E
- $worker->onWorkerStart = function($worker)" h( W+ _# ~, I1 {7 r
- {
% L6 \0 V1 O$ a. n3 g) n - /**) M9 _( _2 e m, U- n
- * 4个进程启动的时候都创建2016端口的Worker y$ g! Y5 A/ h+ ^) B
- * 当执行到worker->listen()时会报Address already in use错误7 j" T) P7 ~+ ~( p
- * 如果worker->count=1则不会报错. Q( Z) D# a2 s7 t
- */
: P k0 t: B5 e6 g" r+ { - $inner_worker = new Worker('http://0.0.0.0:2016');
. @" G" t% p% y9 _ V1 |" H - $inner_worker->onMessage = 'on_message';; f" K% C$ {6 k, c N
- // 执行监听。这里会报Address already in use错误
" i7 V' \& i& O7 ~. b @2 x - $inner_worker->listen();! \- o( g$ ]# i6 I$ O
- };- m8 k; J5 D& u" }# k: ~3 `
- # Z/ D+ q) u" X7 F9 ^
- $worker->onMessage = 'on_message';
! Z5 R! [ H% B \( C
( e f( \, F0 B; w5 ]- function on_message($connection, $data)
$ n+ G# C* J2 }9 G# Y - {1 G) `1 h# H$ e* ]0 N
- $connection->send("hello\n");
1 M$ U$ k" x5 ? C' T" g# i - }" {8 x4 @: p! |
- # j a3 G5 x: k e3 j
- // 运行worker7 s6 x( y% }$ [2 E7 V* l
- Worker::runAll();. I8 i3 v, X7 J9 a
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:9 H' k9 L5 W) Q, o" @1 @) }0 v$ E* W
- * `. n. H0 b2 a2 m
- use Workerman\Worker;1 o% O' s- y h! e% ?
- require_once './Workerman/Autoloader.php';1 O$ H: b" ]9 l% ?, I
- / C4 D' G0 C: W3 r" K+ ]3 s
- $worker = new Worker('text://0.0.0.0:2015');1 a6 ~ E& S- [1 Z+ w2 }
- // 4个进程% s; s+ o, N$ a* A& [! M) ^
- $worker->count = 4;
8 ]3 }) j$ H2 l- v' U - // 每个进程启动后在当前进程新增一个Worker监听
5 j6 N4 ^: b8 x, t9 ^( z; Q - $worker->onWorkerStart = function($worker): K; ^( B1 R4 E1 m+ I, P
- {
( y, j$ Y" e9 d0 _& R - $inner_worker = new Worker('http://0.0.0.0:2016');) H' S6 U7 S- k
- // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
3 i5 f. H' m- K9 _5 l* e* h - $inner_worker->reusePort = true;
( M2 J4 T+ j4 f' [; K% G - $inner_worker->onMessage = 'on_message';0 @4 i8 J& X" P% b8 ?6 j9 v. T
- // 执行监听。正常监听不会报错# |0 m0 f% r( |
- $inner_worker->listen();
. d: v! o& ^5 W& P& ? - };
. a# L2 r' B, J3 G
5 a! r. t0 f/ S- $worker->onMessage = 'on_message';1 F* Y. P! d( \8 m5 a
- 6 ` C1 O/ t* `0 X
- function on_message($connection, $data)- h9 t- i. M1 M6 p6 j0 H
- {5 {1 o/ a9 A7 I6 e
- $connection->send("hello\n");) K# _8 o1 X) |: n
- }
- `1 d; U, \# x; N$ I - . O! k' X+ j2 Q* ]: u! Q3 `
- // 运行worker
# P& [3 C. \3 Y# @3 }6 Z4 d, h$ { - Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php8 v! m+ p. l B& P' K
- use Workerman\Worker;4 K& `' Z1 z9 X; S" o
- require_once './Workerman/Autoloader.php';
0 B7 M! d" C, t' h! R4 P% @! ~ - // 初始化一个worker容器,监听1234端口
7 v- q4 w( ]8 K1 I - $worker = new Worker('websocket://0.0.0.0:1234');5 @% b$ I' p8 j) W
% _7 C5 s. E! _7 T2 J8 s- /*
4 e0 v, P: B9 ?/ t9 @ - * 注意这里进程数必须设置为1,否则会报端口占用错误3 B& N! ?( N0 v( J% p: P
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)) f3 R! @2 C3 X( S, D% G
- */5 ^ G- c3 F$ g# T0 e6 N
- $worker->count = 1;1 ]$ ~% r" N8 _- Z7 ~
- // worker进程启动后创建一个text Worker以便打开一个内部通讯端口5 G2 p" V/ J, D6 P- T' a0 _, p
- $worker->onWorkerStart = function($worker)
$ V: m% i D+ Q+ Q; Y6 E - {, k0 i! x7 \% n
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符# c) w( }5 ^1 u( L2 H
- $inner_text_worker = new Worker('text://0.0.0.0:5678');
- d9 ^9 L, H/ W - $inner_text_worker->onMessage = function($connection, $buffer)3 \" J+ _8 z9 Q1 q3 U4 N
- {
7 I% l' e9 m6 O. Q - // $data数组格式,里面有uid,表示向那个uid的页面推送数据! T u8 K" _2 G4 V+ e9 |5 u& b
- $data = json_decode($buffer, true);
. K3 Y* b8 } W - $uid = $data['uid'];; |9 k( c+ I. r" a) g' _; X
- // 通过workerman,向uid的页面推送数据7 g5 \! w |, U# V+ a d0 V
- $ret = sendMessageByUid($uid, $buffer);% q' M9 l1 r( T) B9 g" C* C
- // 返回推送结果5 F. `3 x: [" Q7 b) U
- $connection->send($ret ? 'ok' : 'fail');4 `$ k1 q3 J' R5 M3 Z0 H7 I/ n
- };
9 A, L6 f K6 Z* e' f2 p - // ## 执行监听 ##
; T j) v1 S) }: v- @ - $inner_text_worker->listen();
1 H5 ?% W h( ]- @6 J - };- H5 A% [4 t2 S3 v' x( T
- // 新增加一个属性,用来保存uid到connection的映射+ |* }8 h9 T: E4 |2 ^
- $worker->uidConnections = array();
( t1 c9 Y; b2 t9 K3 D4 N, e; ~ - // 当有客户端发来消息时执行的回调函数! Q+ r9 [# N# \) [! |
- $worker->onMessage = function($connection, $data)# Y: w* R1 W4 l2 _! D- @1 O
- {
$ p6 R3 m( T7 I. h7 Q - global $worker;; s8 U$ A- J$ U$ t, Y
- // 判断当前客户端是否已经验证,既是否设置了uid
" `0 ~# I5 S$ u% {" o. \( D - if(!isset($connection->uid))
: x# |5 i; _7 D) K' _( T! a7 d' i! Q - {
( k+ F, f; g, L/ ]8 D( l - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)4 k5 |5 I l9 \$ b9 H7 C
- $connection->uid = $data;, k% o+ e6 _$ u+ H U
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,2 Q! @& \; L. P$ U* u
- * 实现针对特定uid推送数据
" O" A$ i5 G! ` - */
1 n N& C: r. |, n7 C) L$ y - $worker->uidConnections[$connection->uid] = $connection;
9 }4 O. h7 V. e6 a3 P: w. d c - return;
4 Q' q2 w+ P% U0 { - }3 v" W7 \- Z5 i5 B
- };/ r: _9 E* ?. n9 `+ b# @! B, A
- , M$ l3 L, o: b+ L: s( Q: p
- // 当有客户端连接断开时
( y; J$ y; l3 M" \; x4 p - $worker->onClose = function($connection)/ `; U3 j! Y3 o$ M! p$ g; D( m
- {
. E7 M/ b: o2 E# G( B+ v - global $worker;5 J. C0 Q+ h2 S5 G/ X. A
- if(isset($connection->uid))
1 t# }1 Q F) [6 x - {$ F& Y, V" p; j1 F, `! R# Y
- // 连接断开时删除映射
# u6 j F% Q. M: R) q) |; ~& b - unset($worker->uidConnections[$connection->uid]);* N+ r P, C; u6 _2 |: s
- }
0 r' e& m: m$ @& P2 d/ l - };
% {3 V. a8 V3 Q5 o6 t4 q - . u" ?2 {% W# E5 ?
- // 向所有验证的用户推送数据
$ W& u: ?7 \8 E0 ` T& S; T$ [. j - function broadcast($message)
2 U& j8 z: j) t8 G% e, L - {! i2 P9 {* d( h+ Y0 K# u3 ]
- global $worker;
9 `6 _+ A4 z2 c' h% i6 a8 A - foreach($worker->uidConnections as $connection)
0 _6 y) b0 ^, J - {/ M# ^8 f: `+ l& }: C$ L" Y
- $connection->send($message);
: `# n: e a; b2 c- \ - }
9 G8 A+ K3 [1 s, d - }
% n% w; q- @# L+ K& x" H; e' @; s - : S h" _- K" f
- // 针对uid推送数据) K; l: R! Q8 K, d' j$ C
- function sendMessageByUid($uid, $message)
K2 R4 h% |0 O( y i5 d - {
# J7 X0 |1 @' _# D0 u# Y) k - global $worker;' w( m! Z$ ?+ i. q
- if(isset($worker->uidConnections[$uid]))
* l5 i2 [% j, v7 Q - {
% v+ w" K4 @. |; F Q: P - $connection = $worker->uidConnections[$uid];
- J, h) o4 R& h* a - $connection->send($message);. L- O6 g$ _8 r5 K& B
- return true;
( F: p1 N2 f u. T5 @$ i& a - } q, l/ ? A) z% W8 v# A1 n# n6 R
- return false;
# o; K2 Y! @1 E1 T8 c - }
! h: C* n$ c# ~7 x2 p( A% T; p; O
5 i2 \1 L6 O( X2 j- F- // 运行所有的worker
8 h M" K. Y) T3 ]3 E - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');
! C3 D& G) _5 a( N - ws.onopen = function(){) Q, W5 h. Z, \2 x8 i8 n- \
- var uid = 'uid1';4 v) g7 ?6 P1 u) G
- ws.send(uid); i' B2 _! U) w+ ]
- };
' R( B# g( l7 G' H - ws.onmessage = function(e){: e; z6 Y1 N: S* J, r+ [1 o! k
- alert(e.data);
! x! c1 M; X: y7 g5 x - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口. Y& x( ~6 B" x
- $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);4 C1 E7 d( A h- D- p
- // 推送的数据,包含uid字段,表示是给这个uid推送3 Q. Z% q4 h; K: l2 D- E
- $data = array('uid'=>'uid1', 'percent'=>'88%');+ Q' _' p- r6 A6 k
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
7 A9 ^4 Q3 h% A# j - fwrite($client, json_encode($data)."\n");$ Z" T1 P, r0 z$ J* r, n. j
- // 读取推送结果
9 L# V$ J {/ s# E6 Y - echo fread($client, 8192);
复制代码 ! ^/ L) f8 R2 a5 o
7 ?6 n5 p' q1 f; b% ]# E |