- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;. A) ]8 F+ U7 W } b/ h
- require_once __DIR__ . '/Workerman/Autoloader.php';
5 K4 m9 E3 f& |" z+ C/ S
. q: x2 ?- Q3 J8 j8 {3 N- $worker = new Worker();
6 W0 Y5 R8 a3 f - // 4个进程( E6 L0 V+ F" @# H3 k
- $worker->count = 4;
% q' `7 `' i% W, P# b- t: G - // 每个进程启动后在当前进程新增一个Worker监听
2 U' A* c- ~& j. z9 X# s2 s0 _ - $worker->onWorkerStart = function($worker)5 {: g5 v$ P( V H. f
- {) ]/ I7 f$ ?3 Q+ j' Y
- /**3 m) V! j5 @* D* c3 @3 F5 s
- * 4个进程启动的时候都创建2016端口的Worker7 q5 G9 W0 |4 j1 H# U2 L
- * 当执行到worker->listen()时会报Address already in use错误* s8 Y4 j6 M+ _1 ?' q
- * 如果worker->count=1则不会报错
* W+ f r) f9 U. }; L% c - */1 V, P9 `) ~2 F& _0 a
- $inner_worker = new Worker('http://0.0.0.0:2016');+ V" d4 M) p, t6 m* D( C$ H' N
- $inner_worker->onMessage = 'on_message';
4 z6 C6 k+ J* q - // 执行监听。这里会报Address already in use错误
/ u6 T# O4 n3 Q0 P - $inner_worker->listen();
& }" U7 v, w+ j7 ?5 k - };
9 n! z. F/ ]: C N* z O+ X' T* f - 7 b! _4 L1 `0 m1 z9 ]1 _
- $worker->onMessage = 'on_message';
. m: E9 }' r5 W! X) I
6 j6 R/ T7 r9 U& \7 z) Z* }) A/ U- function on_message($connection, $data)
t' j! s4 _0 X6 I - {
. u9 i- `- @+ B, X - $connection->send("hello\n");
( Q# S. M: @7 M# c; Q - }
% `- C/ B2 |$ T, i
- ]9 N4 o4 H1 Z) D' `- // 运行worker
. y" z) o3 W5 |" Z# ` ` - Worker::runAll();
; }. ~+ W# m, X" B. I - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:; P4 P' s9 ]6 b+ s! H% M/ ]
! }9 e, h( s; x6 N- use Workerman\Worker;, d& ~! r5 [1 L" [# Q
- require_once './Workerman/Autoloader.php'; O, ^9 G8 i9 g. Q' g, T' H
+ y6 s y6 D+ }% B- $worker = new Worker('text://0.0.0.0:2015');
% d+ j' K# P& | - // 4个进程4 s; E. k6 h7 R0 \
- $worker->count = 4;6 Z; D, ]$ j- I) |! S4 m
- // 每个进程启动后在当前进程新增一个Worker监听
6 X9 Z( l9 Q( t, y+ ^ t5 r - $worker->onWorkerStart = function($worker), a2 E& C4 J& e7 v* E7 z- B
- {
. d, Z: Z6 I: B" o( P - $inner_worker = new Worker('http://0.0.0.0:2016');
( k, s, n2 \8 @9 O - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
0 g: u, }5 [4 X5 g% M5 r1 ~( K - $inner_worker->reusePort = true;
3 R; F9 D9 g) x# E - $inner_worker->onMessage = 'on_message';: U+ n; r) ]/ y, L3 a
- // 执行监听。正常监听不会报错
& S! o5 D9 W' D9 j1 W' l3 T - $inner_worker->listen();* `: ^% x) S% z0 b% _
- };* `% q9 K: ]* Q* m' C
- + d+ t- V3 e& V
- $worker->onMessage = 'on_message';
& l" f+ x6 ]4 Y, r# c0 p* o+ q! F
0 A4 u3 u( ] t7 N7 s( H9 _- function on_message($connection, $data)
3 Z6 d( M& y1 {% \+ u# E - {6 h" k" P$ N% \3 x6 a: \$ P
- $connection->send("hello\n"); }3 r0 r# j+ P, |$ ]: U, F8 i
- }0 h' j+ o5 E; Q3 ` A) U. @
- ' G& P, K4 N# _' V
- // 运行worker( n( K. y; F9 R# `) M
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php
) h1 T: b6 S N7 x - use Workerman\Worker;
# J h, } z# ?2 E4 q. l P' p - require_once './Workerman/Autoloader.php';
. c/ Y3 S! l& t9 I7 t c - // 初始化一个worker容器,监听1234端口6 u% ~$ v" m. G/ x
- $worker = new Worker('websocket://0.0.0.0:1234');
7 L* t1 g. f0 }5 w' h
; t/ t* e) Z* F/ n; A1 }- /*
. n q$ a3 @" H6 t O& x! Q1 f - * 注意这里进程数必须设置为1,否则会报端口占用错误7 H% V; p4 @* t
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
3 Q7 ^9 i4 j, W5 T( I$ Q( h - */
2 a1 j) j F4 I - $worker->count = 1;* z8 R# N; A: \4 H6 }5 B- p' q
- // worker进程启动后创建一个text Worker以便打开一个内部通讯端口$ C9 m/ [! Y7 c; y' B, F1 z
- $worker->onWorkerStart = function($worker)
0 y$ D* a4 L- _ - { [: c. B7 q- t" B% N; f
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符: z/ S9 l' T# K9 [' D
- $inner_text_worker = new Worker('text://0.0.0.0:5678');
, z& a3 @1 G% n' i$ p" | - $inner_text_worker->onMessage = function($connection, $buffer)( x7 u$ P* Y; d2 o8 ^
- {# I) E# G0 p. N: w5 F
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据: i* q, p* n1 d0 b G, m
- $data = json_decode($buffer, true);! ~8 U2 q& k( ]# f+ }
- $uid = $data['uid'];; g, D5 _7 w( L i7 C% U
- // 通过workerman,向uid的页面推送数据" K P' ?7 w2 t$ }
- $ret = sendMessageByUid($uid, $buffer);
6 M) h" [/ s4 {+ [5 v- L# C1 d - // 返回推送结果# D: V! J) u; w' B& N7 D
- $connection->send($ret ? 'ok' : 'fail');; C9 p( u- w: I7 J9 I- c5 V
- };7 O! h+ q0 y( ]% q
- // ## 执行监听 ##* F5 j' j) e' s+ F* ?9 B9 C
- $inner_text_worker->listen();5 N% E' x9 w) |2 _6 w& Z0 r
- };
# f, q8 Q5 ~& ~( h/ \4 A# K - // 新增加一个属性,用来保存uid到connection的映射
- _) [& E! B& k" Y - $worker->uidConnections = array();( k- i6 O3 c: o4 O
- // 当有客户端发来消息时执行的回调函数( e" M. X* N- Y+ h8 U+ g& Y0 Y
- $worker->onMessage = function($connection, $data)6 m2 S. `5 U/ ~. t, q
- {& C' s# j1 T( }1 O" E
- global $worker;
0 {7 \# @9 z0 N# o y - // 判断当前客户端是否已经验证,既是否设置了uid
" ?' I* z% P0 y* k" `% i/ w) k - if(!isset($connection->uid))) o' T+ |7 [9 k2 @+ r- q; i! d
- {
& P- x) m6 Q1 E0 u2 x - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证), B$ x' L' T# y! b
- $connection->uid = $data;: y& ]6 n9 ]+ b; I
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
- `; J* k; i0 q' Z8 g8 I2 b$ I" {. N - * 实现针对特定uid推送数据
; M8 Z( \- o0 c; Q; k2 m3 [ - */
: u% r0 \( N D8 a L0 P! X+ F0 Q8 r - $worker->uidConnections[$connection->uid] = $connection;
* X! Z# Q& C" ^+ y& ?. \ - return;
4 _0 n( h+ K+ t9 w0 E* O - }2 ^# K6 V) L) s
- };
% l1 B! i2 z* r) N1 h, g' i - " t- n- W2 e: i4 O& o2 e
- // 当有客户端连接断开时
4 O& ?' E/ g, X4 b9 B - $worker->onClose = function($connection)6 j* i/ C+ [ z- T8 d7 i8 R
- {/ A0 a% w+ [% R+ f( k
- global $worker; \, |1 D& Z1 z9 q- @* \/ ^
- if(isset($connection->uid))& ?# P# G$ q3 E4 E* y/ P. W( F+ a
- {) c: W/ u, S$ {/ `
- // 连接断开时删除映射2 j2 k% z! D1 S5 \( g( u5 E
- unset($worker->uidConnections[$connection->uid]);
) Y3 p' v# r2 Y( F( w4 M; D - }
6 _, w# V! W; V$ _8 M - };
9 z5 g& {( { d, c1 z" C5 n/ x - 8 ]' d/ }, p# E! v
- // 向所有验证的用户推送数据& u4 g8 O) y: F9 T1 S* V; [( D
- function broadcast($message)
/ r' X$ U4 o; S+ r; V - {2 L o' n; l7 j# Y* V+ {" K
- global $worker;- v: O: t" k4 W# A% f* g
- foreach($worker->uidConnections as $connection)
- _! U" X t* \/ U1 b+ L - {
9 p5 t7 x8 `4 F8 o1 ~ - $connection->send($message);
, b, k5 i" p$ W; { M) ~- b( L - }
, V# a8 P3 m8 x% x( f - }
$ y+ F8 |2 k1 i/ c* @- _; j - 0 s. B& ~) c- k) V6 F
- // 针对uid推送数据 ^: U$ N% p- Z. R
- function sendMessageByUid($uid, $message)
o+ ?; s/ u3 d' _# Y6 C) T9 _ - {
I4 G @: ~) t- V. ^ c7 J6 z - global $worker;( B: Y; C1 {/ L3 d& y; }
- if(isset($worker->uidConnections[$uid]))7 |0 r/ u! M. \: z
- {
! A: j% x* R9 |9 Z# t2 E* F6 { - $connection = $worker->uidConnections[$uid];
9 G0 |& K( g1 z. ] - $connection->send($message);
4 a% U* u* F! l4 u; h- A9 h - return true;
8 A/ Z* f3 H. ~# {- K6 l - }( n& y2 i e* s
- return false;" E) J+ I* |$ [+ }8 u3 o$ ]# y
- }
- `9 H8 [+ Y( F+ f
8 A% @; |! ~8 O; w) U- // 运行所有的worker
5 S: Y5 M8 v, a - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');
4 J0 j4 `4 s8 a) T9 f - ws.onopen = function(){
" }" b9 o- K2 u( b - var uid = 'uid1';# k% R. o. g. a0 D: m! o! p4 I
- ws.send(uid);" @% {- B* P5 A9 C, \
- };( D, q* r0 O5 s1 k" g8 l
- ws.onmessage = function(e){- Y4 M9 L0 r- e) U
- alert(e.data);, y* y. V# v5 l9 m+ E( H
- };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
& @; z; p% U" P9 O2 q" l - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);5 o" Z/ [: X0 X$ @% d' Q
- // 推送的数据,包含uid字段,表示是给这个uid推送
$ S3 ]3 w$ q# q1 Z& I' \: ^ - $data = array('uid'=>'uid1', 'percent'=>'88%');0 S* s* G, m; U j- D, j+ J) w
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
5 Q* U$ f6 }% c) G - fwrite($client, json_encode($data)."\n");
" ^ z) |5 J$ M# g6 u' M - // 读取推送结果/ X' M0 [7 G2 q' ?' a
- echo fread($client, 8192);
复制代码
u% U" q$ @: s& \
( s. M4 _* ~& f; Q/ p! b9 U0 W$ J |