- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;- ?0 B/ s5 U; d" S4 G- M# W' @
- require_once __DIR__ . '/Workerman/Autoloader.php';/ G2 l2 M# E. @1 V2 y
& `% _, w' r& f8 [( k- S s7 j- $worker = new Worker();) n2 j1 }( r- b/ q" r
- // 4个进程
1 e% ^6 @: Q3 i9 F# K - $worker->count = 4;: Y4 I' c' q+ U; z$ C) y/ o- C; @
- // 每个进程启动后在当前进程新增一个Worker监听" D/ {! |6 Y8 k1 Y# n
- $worker->onWorkerStart = function($worker)5 G& B# x$ ]9 f4 z6 I* \. K% M
- {2 t* k, w+ |4 j) ]& Q
- /**
3 w% i q% q" t1 Q, g! J5 ]; f - * 4个进程启动的时候都创建2016端口的Worker% Q8 G/ a1 e' k4 q
- * 当执行到worker->listen()时会报Address already in use错误+ ?$ O3 p0 v" o" C) l- h
- * 如果worker->count=1则不会报错
# k; ~" w3 S! ~& F; T. K' r - */2 R0 I1 c* m( N9 |! p* |! k
- $inner_worker = new Worker('http://0.0.0.0:2016');0 L' v1 E7 K r9 w; e' d0 }! y
- $inner_worker->onMessage = 'on_message';( V4 j. e Q! y
- // 执行监听。这里会报Address already in use错误
: M( Y7 Z6 p0 l9 e8 [- L/ F - $inner_worker->listen();
) t" \* }0 A' K, n. ], j9 a4 R+ U - };
/ I2 I6 ^; N- x# _7 ~: ~) z - ; \( r$ g9 @0 q5 s$ L
- $worker->onMessage = 'on_message';! Z/ @3 x; c& c( X
5 ?2 T* O% T' M+ {4 Y% i% T- function on_message($connection, $data)2 J4 Z7 U# c+ m& ^% j% c& W
- {% Y% s9 }: `4 U3 v2 d' G
- $connection->send("hello\n");) C' h8 E+ y! W
- }
' X# B) ]' l6 F! d$ I
4 f V( @4 H+ l+ D3 S" a- // 运行worker& T. H5 n+ u2 y" K
- Worker::runAll();
) ?& B1 {8 K. t( I) I6 {+ e2 A - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:9 `+ V# s* E. ]3 C' b; A
- - ]$ g, R0 Q8 E1 q$ m1 [' g
- use Workerman\Worker; C' Y& e0 s) K3 c
- require_once './Workerman/Autoloader.php';
! |) ]! l; i+ u1 u3 L
2 J* Z" @. U) }9 U' I" U/ H- $worker = new Worker('text://0.0.0.0:2015');
5 P9 m5 g- ~3 @- _* _$ y - // 4个进程
! t# U3 f) C* \8 z3 B: K4 L4 J1 } - $worker->count = 4;! p/ }( p5 h) t6 Z# P
- // 每个进程启动后在当前进程新增一个Worker监听
, o: g" e. y/ E% O* N5 V! m% Y - $worker->onWorkerStart = function($worker)
- u: r# a7 ~+ p4 L/ K, {: C - {+ T/ T7 \0 z8 Q- {% d
- $inner_worker = new Worker('http://0.0.0.0:2016');
7 z7 b( x6 s* E( ^% k - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)$ Y: q+ I/ `! V5 R7 n/ [
- $inner_worker->reusePort = true;8 t2 c' B) t6 k2 P1 P, b! {
- $inner_worker->onMessage = 'on_message';
4 J) B; u1 r, A3 ?& B# z" \9 j - // 执行监听。正常监听不会报错
+ ~/ A+ q' m F0 P2 d$ ` - $inner_worker->listen();
* e v7 s& _; h5 ^, Q& Y - };6 p$ E3 c9 A' e: K" w K1 C/ F
- 9 a5 w" i R8 o) `
- $worker->onMessage = 'on_message'; @0 ^/ d! r& A9 Z: @! S
- : G G( L$ n2 a3 |
- function on_message($connection, $data)5 _' _' C! Y# @7 t9 `7 z
- {
7 m7 U! Z' ^- f" A5 d/ c - $connection->send("hello\n");
$ H q* H- q, @# a' o0 y - }
3 i2 y5 {8 P$ I$ W0 P9 j, P! ^1 X - 6 J \. s" e: ?! B( x
- // 运行worker2 Q v& U6 g# F1 e
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php
# U1 Z. M% S) G+ K/ j$ w - use Workerman\Worker;- I$ B, e* |8 d4 N
- require_once './Workerman/Autoloader.php';1 m* y/ F. g) W) j
- // 初始化一个worker容器,监听1234端口
9 e* [. a0 R# m* H - $worker = new Worker('websocket://0.0.0.0:1234');
' q9 A8 q d. ?; |
! N7 ]1 Q9 m' m- r: \) X7 r$ ]8 f- /*
# O, x2 O$ q$ q$ D/ v6 s1 |; I" L - * 注意这里进程数必须设置为1,否则会报端口占用错误9 @% B1 W# L R) i( y' H! d
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
3 m4 i5 `9 c6 o$ M! {# T - */
% {$ r8 |, K" P4 f! S& b) c" b$ R - $worker->count = 1;
+ Y' t- i. q: W - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
# Z) I2 ?6 b' H4 }( n/ ]* a - $worker->onWorkerStart = function($worker); I. K' q Y/ h
- {; A, Z9 }1 y, f8 Y8 H
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符) @9 s g2 d9 n/ C2 E5 e& ?: X
- $inner_text_worker = new Worker('text://0.0.0.0:5678');$ b# G$ I' g" ~: J9 f3 F5 u( B
- $inner_text_worker->onMessage = function($connection, $buffer)
# s7 H/ a9 z2 J3 n( [ - {8 r5 m4 M# Z) @- f( o/ F4 {1 b n) H
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据
: R2 o& ]' M; R( x - $data = json_decode($buffer, true);
" Q- E# ?0 E: q# r- G; K" J4 R - $uid = $data['uid'];
" G9 E: L. |/ O! S. z - // 通过workerman,向uid的页面推送数据& S1 ~$ y' X7 {) T- L8 O
- $ret = sendMessageByUid($uid, $buffer);
; ]9 n) y6 _6 A) n# x: p K - // 返回推送结果
1 [) E! U# L7 z6 ~' W) R - $connection->send($ret ? 'ok' : 'fail');
3 x" Z: E* k' d$ h" K$ ] - };
1 i* ?% x/ u$ ?- V7 K - // ## 执行监听 ## N% S) B0 t- y8 c
- $inner_text_worker->listen();
. }# v }& r+ l - };
9 B% z4 ^+ I# N - // 新增加一个属性,用来保存uid到connection的映射5 j% m G n' m0 g( z& ?
- $worker->uidConnections = array();2 O! k/ i; K# b% E
- // 当有客户端发来消息时执行的回调函数
& S# I7 @+ E" p; y' W% h0 n8 l) ? - $worker->onMessage = function($connection, $data)
9 P# G, [1 }+ h. _+ [ - {
% F. n' P& H. Y; J2 m - global $worker; z4 g" w1 \' t) ~( I0 c( J5 R* l
- // 判断当前客户端是否已经验证,既是否设置了uid
5 K! f6 V6 l @: I3 A4 p0 v - if(!isset($connection->uid))
6 A* V) c& w- F" w- X" B0 H - {5 N5 c" g! G0 l9 ^# X4 r: i
- // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证); ^9 p3 w U# y# ]& M5 }
- $connection->uid = $data;
) J" O6 k1 ^" q3 g. V+ o* b. a - /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,) f4 }- \+ T. g6 v
- * 实现针对特定uid推送数据
, {/ I. W; u- N% I3 O+ L, p - */
4 C0 n" N, r" [9 ^: ^ - $worker->uidConnections[$connection->uid] = $connection;$ ?+ Z) I$ E, m0 Y4 w
- return;6 o1 H7 [( c- U' p' ?
- }
9 c# [7 H$ n, \' q - };: n. Q! T, Q7 Y* a5 W6 }% N- ?
. M0 P; A* `7 b7 d- m, D- // 当有客户端连接断开时
( a5 Z: D8 q* C4 K: l' l, d - $worker->onClose = function($connection)
- d! h, Z, A& v. b, @7 U2 S) I$ L - {
0 T1 w; |* n) V5 u - global $worker;
* j* m1 X8 p( } - if(isset($connection->uid))
' Z3 N5 k+ P- j - {. p* L5 C$ }; z" p
- // 连接断开时删除映射
; Z) m' }5 H/ `) m6 p7 x) M - unset($worker->uidConnections[$connection->uid]);
2 b4 c5 \8 P' ~: O! G$ Q: W - }& Q- j! F8 m0 v ^4 s/ P, n
- };, B2 K9 W8 u8 i0 o! Y6 w
- ( w6 ]5 s- N4 B' t
- // 向所有验证的用户推送数据5 Q7 k: G$ V% r
- function broadcast($message)
9 F" {% x( v0 b) n F8 Y. v - {- }& t w k5 ?7 ]
- global $worker;
) p) I5 `! j" q/ w0 [ - foreach($worker->uidConnections as $connection)
! B2 G6 m6 W1 {- h& Q2 O& l* \ - {
0 U; m7 n9 a1 C( k3 ^; L - $connection->send($message);
/ m' w* ~' U1 i% f6 q# S - }
7 ^% R7 x/ j j# k( q+ s - }
+ U5 L {7 Y1 k/ X, Z - ( l4 S$ T" Z7 N* S& e
- // 针对uid推送数据
7 ]. |( s5 F- f" _' U: `4 L) l8 j - function sendMessageByUid($uid, $message)
6 i! o7 B9 ] i - {
( w3 J& K2 g# @! Y8 X - global $worker;
4 k; _$ [% c6 c - if(isset($worker->uidConnections[$uid]))/ t- Q9 |' M5 s9 a: l
- {
! j. t) C$ [- h5 w9 V" C - $connection = $worker->uidConnections[$uid];
+ n$ T7 W" i. }+ G - $connection->send($message);0 p- _2 f7 B# F" w: k5 R
- return true;
! v8 g2 y, k) P7 g - }) p5 a6 l7 U3 H
- return false;3 A6 m( E& Y' I' D
- }
' z% }4 I" t8 G
, F9 A) _/ g8 h+ J( b- // 运行所有的worker8 d7 }+ q9 B5 V4 \# W" l6 A& P
- Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');
, s, Q x/ m* \9 S/ Q. a - ws.onopen = function(){
7 o( K2 {" }8 s. q! v6 W3 {; M - var uid = 'uid1';1 r4 o) N, `/ Z( [, u b& S
- ws.send(uid);
4 A9 ~, z5 ^/ B3 r$ k - };* |+ G2 e( i: k1 X6 e5 s
- ws.onmessage = function(e){
$ r3 F& m9 Z! l6 Z$ h$ | - alert(e.data);+ ], i' h0 D; F( p# u$ Z- Y
- };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
$ _ C; w5 h' X/ w - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);# G9 M. c% u, G
- // 推送的数据,包含uid字段,表示是给这个uid推送
/ c. B" W) j0 F9 D1 i: F9 f* c3 ^$ Z - $data = array('uid'=>'uid1', 'percent'=>'88%');2 D9 D. F/ W; T& p4 n
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
' L: U4 s; N2 l: f h - fwrite($client, json_encode($data)."\n");& O" C3 ~- g( f/ D7 c
- // 读取推送结果: [) r! n; Y* j4 u
- echo fread($client, 8192);
复制代码 ( v% e4 k# l) F7 s8 f: K
, M5 g% b; N; r2 { |