- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;
* v Q$ r: b4 E* m# @ - require_once __DIR__ . '/Workerman/Autoloader.php';
* k5 `5 S5 r& F1 m5 W
) g, Q3 i4 c0 L4 k( ?" u- $worker = new Worker();
' c# T2 l. L5 b! T, B- f& {4 W! P+ h - // 4个进程. C& Y5 X0 y. Z+ H, p7 r( s1 o
- $worker->count = 4;
5 D7 n; V, l4 `& f( z9 C( D - // 每个进程启动后在当前进程新增一个Worker监听* m; g) e9 z: N( p+ z' m, w
- $worker->onWorkerStart = function($worker)
7 [. R0 K1 K( E; l% y/ I K% Y6 R - {* \1 m- n6 o0 r6 H6 P1 I6 W
- /**' f$ I$ e3 e' D* b) b" k$ E
- * 4个进程启动的时候都创建2016端口的Worker
: l7 z( \7 A" W3 C - * 当执行到worker->listen()时会报Address already in use错误5 e* z' d$ `6 ?$ j3 F1 M
- * 如果worker->count=1则不会报错, A6 `7 S/ S& W; Y% j
- */
8 u2 O; I, ]3 x, D: P: g, x - $inner_worker = new Worker('http://0.0.0.0:2016');& o5 B0 m2 {3 N
- $inner_worker->onMessage = 'on_message';
9 k; [* F: W9 d$ G9 u% p - // 执行监听。这里会报Address already in use错误- N* C& A" @4 ]' z* \1 x
- $inner_worker->listen();
9 ?1 B9 j1 m( @9 W, J4 { - };
2 Q" a9 Z! b$ [: T - 9 e9 ?1 i) O, h3 y. d
- $worker->onMessage = 'on_message';
" a8 I" b7 Z0 b& W: _ - 3 _* A4 ?$ N: ^# B- f
- function on_message($connection, $data)
: q5 k0 E4 k' k" K - {
; n4 q S3 G! h p1 M6 x! b - $connection->send("hello\n");2 ?- E9 H7 s2 w& I# ~$ U; j
- }
$ w- g- ^" ^7 G S( Y - ' i' I+ y. }; B7 f
- // 运行worker- T; }) c; ?% N4 k1 M6 e
- Worker::runAll();$ p5 B- E/ S. U0 `: [4 p
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:) |4 k" w: ^) [# j; J) X
3 B% Q2 s! b T X3 C& U) Y- use Workerman\Worker;
/ U* n, Y0 ^1 U - require_once './Workerman/Autoloader.php';+ ]% ?4 S( S- A$ T k: U& }; |
5 ?- E, b, Z' L2 t$ H5 y- $worker = new Worker('text://0.0.0.0:2015');) Y. K' K3 H( [9 L/ d, h
- // 4个进程' [3 O* }, x- Z; k
- $worker->count = 4;/ ^9 E" R: W% V) X: b+ d
- // 每个进程启动后在当前进程新增一个Worker监听( E; w/ m0 `8 X7 q2 {$ a" S
- $worker->onWorkerStart = function($worker)* Q- m$ J2 s1 T$ U# C
- {
8 ^& P* s8 Z, K) D# N2 Q( | - $inner_worker = new Worker('http://0.0.0.0:2016');! e' R5 ~! x6 L/ w- ~8 Q
- // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
* J* k" j. F$ z$ ]: b' k - $inner_worker->reusePort = true;
( s/ O) G1 X# e) P - $inner_worker->onMessage = 'on_message';6 X' o \5 k1 Z
- // 执行监听。正常监听不会报错5 w, \, \) {5 [' _( ~! i& C
- $inner_worker->listen();/ U! n' K. c% _
- };
+ I$ J; d h% i& G - 9 z. m% r% P* n d: a) W. o# }
- $worker->onMessage = 'on_message';
; a4 W* Z9 Q6 D- _+ i$ f" f
: _6 X3 C0 Q3 f* I" v- function on_message($connection, $data)
1 w0 n) o. ^# s* n1 o* H4 p - {
/ e& C. a, i3 p% _. |3 Q) |, n - $connection->send("hello\n");* J1 ~) U O! i; r
- }2 [& v- z+ ^8 Q; d+ u, E
2 \! l7 Q! _" U/ w3 ?- // 运行worker0 k7 g q" j6 W- d3 T6 P
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php
3 X: ?" z( N6 y6 C: O. C - use Workerman\Worker;; n7 u: Q- c6 B% p7 g$ h+ q# l, L$ _
- require_once './Workerman/Autoloader.php';2 I/ G; v2 W3 `: e5 i8 t/ M, z& M V
- // 初始化一个worker容器,监听1234端口
5 _. X7 i, C) C% r - $worker = new Worker('websocket://0.0.0.0:1234');
2 Z* d" P/ M- e# k+ Z5 C' W
3 U% k7 ]7 E. |: c% J0 }+ @; o r- /*8 b/ E' i3 B" F+ r5 U
- * 注意这里进程数必须设置为1,否则会报端口占用错误 _, ^) M6 s. S! w6 j
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)4 m+ D1 @9 l1 o
- */
; b. R* n' u& p - $worker->count = 1;
, r9 g# B' f6 S6 y, J - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
9 y- S. d( d6 n7 j% W, @ - $worker->onWorkerStart = function($worker)
! ~' B' q V \ - {
/ \1 g2 Q3 r1 x* [: I# J - // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
3 A" o0 c# D: ^6 `9 T - $inner_text_worker = new Worker('text://0.0.0.0:5678');
- s) k& o7 W, J- T4 |9 N; @4 N - $inner_text_worker->onMessage = function($connection, $buffer)) o/ u/ ?$ s' q, ~: V
- {8 w3 |& u, `: G3 [' u0 ], f
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据
1 W0 x, [" z I. [% \7 f - $data = json_decode($buffer, true);3 V1 t9 {/ i. }
- $uid = $data['uid'];
* m( e9 N1 s ?+ g" B; a( x - // 通过workerman,向uid的页面推送数据9 r9 V S8 r6 z$ r# t0 J s
- $ret = sendMessageByUid($uid, $buffer);& C% `" ? `+ h4 C8 ^; D
- // 返回推送结果
% K" p2 p8 G8 P ? - $connection->send($ret ? 'ok' : 'fail');1 F" W: f- `" o; C
- };1 J( Y2 D9 j6 P" S4 G# B( t
- // ## 执行监听 ##
( P, x5 i3 t1 e7 P T$ S, m1 k - $inner_text_worker->listen();
/ w( M* W9 L) S- e - };
) l! E- p* X& _! s: h - // 新增加一个属性,用来保存uid到connection的映射
8 G+ H) r( l+ N. \& l" x. o - $worker->uidConnections = array();% F0 M0 K! z- p9 Y: \
- // 当有客户端发来消息时执行的回调函数
/ l% ]7 _) k# R* r - $worker->onMessage = function($connection, $data)
* o7 k; [# l5 L& i+ [5 G - {
3 i( P4 C& M& b( M! M7 y - global $worker;1 r" ?% x/ x- u, [/ a
- // 判断当前客户端是否已经验证,既是否设置了uid+ ] I3 \% \5 D' _
- if(!isset($connection->uid)); D' u7 @: X& p B
- {3 t3 Q( Z- V4 H* I8 D' a$ Y1 T
- // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)6 ~( m/ M1 ?0 G6 y/ u
- $connection->uid = $data;4 ]& ]8 {4 `6 D+ |/ p
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,* A" \, I& e- ~7 v
- * 实现针对特定uid推送数据. \+ z1 d/ V: A: [
- */$ L2 {0 x' [2 Q3 ?! }" Z7 ]
- $worker->uidConnections[$connection->uid] = $connection;
5 n9 I! |2 P" \& Q- }2 t& e# i, G - return;: v1 x$ {6 y3 k! A) f6 L
- }
8 v) z; N7 S# B - };
5 K& U7 W9 ` n! ~! z
# ?, X* R4 f/ t9 P+ F- // 当有客户端连接断开时
- r: b2 \0 v# w2 w5 d! D) }+ k, V! Q - $worker->onClose = function($connection)
- [6 c8 j2 X8 s# F - {, G- P5 R2 r2 m) `3 o+ y% V2 C+ B" _
- global $worker;
5 ?6 {8 [% L `/ b - if(isset($connection->uid))
? }: i+ ?3 R# J - {7 P% J$ G5 H) I- a* b6 e" A
- // 连接断开时删除映射0 ?+ ]! ~) B' _4 H+ t: _
- unset($worker->uidConnections[$connection->uid]);
! \( X1 r, l1 Y5 G - }
# s- ^. K) N: c# { - };
2 r" i4 Q" Y* M* j" o - ^: f, ?$ n1 V) \& r! d. O
- // 向所有验证的用户推送数据$ ^- L8 M6 K4 ]3 T; ?* t1 T+ a2 {
- function broadcast($message)1 p9 C+ k6 p! p3 R+ r2 _6 T- H
- {
$ i1 i/ v4 K& R - global $worker;& o7 e* t- {3 w
- foreach($worker->uidConnections as $connection)
$ b& ^* ^- a9 R% m7 M - {6 \1 Q7 E% Y* K
- $connection->send($message);8 t2 K2 C7 d! { l* b- U: T
- }- g% R/ M: V, d: L! o. j: w
- }
n: J4 N; o7 H, U' P7 D* @
3 {0 \: P5 n0 ^. P4 v2 h- // 针对uid推送数据% b, y8 {! w& K- Q0 P3 d/ V/ t9 k5 a9 g
- function sendMessageByUid($uid, $message)
8 j n! [9 [" Q. a - {- x' v: u9 I2 q7 a+ s/ v) X- {
- global $worker;
# Q, @7 _. U; ~1 G9 Z7 t- f - if(isset($worker->uidConnections[$uid]))
5 G. n+ H d8 s; s( d - {
# v) \) F7 D4 O; i - $connection = $worker->uidConnections[$uid];
3 U% b# c8 y) [$ D - $connection->send($message);
) D# i" T/ X$ A - return true;$ T* r3 B9 c/ r* n2 }2 l
- }$ U6 }% h5 M' z& @! n6 I! j1 l# c
- return false;& y8 o; a" [4 _
- }
; s2 x v- l I - 0 `; J @- ?1 R' k. S! {8 f
- // 运行所有的worker7 K6 ?# I1 D3 L2 `
- Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');% z+ J, M. T7 u- P' K
- ws.onopen = function(){
8 s' s# A5 z# r! P" v' d- Z - var uid = 'uid1';/ l/ ?0 C# ]3 Y# X6 |4 g! A7 \
- ws.send(uid);
4 K3 D: Q, I' h7 _% U( {5 ~ - };
2 e4 ^5 E* @5 I- E! W2 g* h r - ws.onmessage = function(e){$ w( d. G2 X; n. y- g$ C- O* Q
- alert(e.data);& Y8 a1 O4 l; o7 y% j
- };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
" K$ d! v4 I- Y" y4 ~' U - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
- p% ? a( E- J: T" H# ` - // 推送的数据,包含uid字段,表示是给这个uid推送
% y! _& }4 V( b2 O0 ]* b - $data = array('uid'=>'uid1', 'percent'=>'88%');
8 q4 B6 T/ h2 k( \4 Y3 _9 v) c5 e - // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符2 R* h( @; l' G; M1 H
- fwrite($client, json_encode($data)."\n");8 \. a" f# j. U8 Q! a. b
- // 读取推送结果
- R5 D+ F! i' ~/ @: \& q! P) ?4 ]5 @; f - echo fread($client, 8192);
复制代码 + a- Z- e* h( Y* C" P- G" R q
% R G% x" p% f5 S' }' _
|