- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;+ S! ~3 Y2 R" m5 c
- require_once __DIR__ . '/Workerman/Autoloader.php';( L* T3 L' B% |' Y0 C. I5 a
- ' {+ t: @# t7 a3 N
- $worker = new Worker();
6 P! A* x8 E3 v! R - // 4个进程
/ M8 ~! B2 ^# y2 y; ^. w4 q - $worker->count = 4;! B7 m$ g% @$ z( B1 P2 m p- [4 J
- // 每个进程启动后在当前进程新增一个Worker监听7 B% R- m$ J$ K- m, Z: u) Y
- $worker->onWorkerStart = function($worker)
. j7 M( b+ p: C6 w - {
8 G, n# Y4 Z0 d - /**; M9 Y+ N& `! C7 g" J
- * 4个进程启动的时候都创建2016端口的Worker
, U3 c6 k$ g2 n - * 当执行到worker->listen()时会报Address already in use错误7 I3 [! t3 U ^! e2 e- q9 r/ T
- * 如果worker->count=1则不会报错
- N3 t" [8 H! o1 H - */, C2 N7 C9 F; P9 S1 d: v4 T
- $inner_worker = new Worker('http://0.0.0.0:2016');% g+ X5 W: S( [5 [
- $inner_worker->onMessage = 'on_message';
) Q( H. P9 J4 ]% q) r- O - // 执行监听。这里会报Address already in use错误
/ K# z. Q! T5 a- V - $inner_worker->listen();
+ L+ t; f2 w. X2 k; o& v& H6 H8 } - };
# a9 _# R9 i" W+ }) J, i% ~ - : p3 I( R( e: e" Z9 M- X
- $worker->onMessage = 'on_message';
) b* i3 x* R+ l+ T1 D' q- j - + m' x4 d: J( a4 }, B, H% N5 ^
- function on_message($connection, $data)
$ k. Y, e6 @+ y* v0 `2 h - {
! N/ v& d' {2 l( s- Y: \( ~. @+ x - $connection->send("hello\n");8 ]3 ~9 B- Z5 h7 z4 q# C
- }! P1 ?/ r) a# m0 K1 y' _6 ^' Z
- 8 a2 l/ V, q0 M8 v+ A4 M. ]
- // 运行worker# G* P! V, n7 @2 I! n8 k( ~
- Worker::runAll();
H0 q! s6 p# G c - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
4 M9 e% p1 D& r
e4 m& Z6 u! {' D% d/ Q; h. I: e- use Workerman\Worker;8 K* u% b) K& Q% c2 j# D8 o2 M
- require_once './Workerman/Autoloader.php';
9 \0 P$ P7 ~1 M" b
# i; W+ o6 [5 A( m# S+ k4 t: o- $worker = new Worker('text://0.0.0.0:2015');
9 W* f: O, {1 T4 X- n" P9 u - // 4个进程$ \5 e6 d. K5 |0 o$ j- a* _; V
- $worker->count = 4;
$ A' v4 P: D6 d/ d i' ? - // 每个进程启动后在当前进程新增一个Worker监听
: `' `- r" ~. o! n+ A) q/ ^( ~- q - $worker->onWorkerStart = function($worker)3 A/ E# `# L: D
- {- e$ u$ j9 j. V4 X8 q7 s
- $inner_worker = new Worker('http://0.0.0.0:2016');* B- e" K {. ~; m% l* Q$ B# X! {
- // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)) |! T0 z. C; ?2 O! Q6 ^
- $inner_worker->reusePort = true;
# g% p+ O- u6 z- ~- t- m - $inner_worker->onMessage = 'on_message';
4 H# L! e7 k- M$ k# v" n4 ~: J5 c3 w - // 执行监听。正常监听不会报错- y5 ~% S% H1 C/ D3 o+ d6 t& `, u
- $inner_worker->listen();7 Y$ ^& d9 W6 G# t( A/ Q7 ]9 E
- };5 b/ G/ o8 _! X: X/ Y
6 f3 p$ X& m5 u& X- $worker->onMessage = 'on_message';
5 U# d4 [, P$ j, W: R
% `* _6 I$ T! c6 a- C# M' W( v- function on_message($connection, $data), b- p8 [1 r$ n: t8 j
- {
. D# T/ x0 `% V0 r/ w0 X+ j' h. L - $connection->send("hello\n");
2 [9 w: y, T' r, P1 t5 j: g - }
) r" _+ ~+ u' q( ?
. D4 Q) ` {0 [; n0 h- // 运行worker
' W F& b& M$ e) d! c - Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php
0 J, x w8 S* q - use Workerman\Worker;
( {, m1 G* _( y - require_once './Workerman/Autoloader.php';
5 W+ d- j) i' {$ g, x1 l# G2 H6 m - // 初始化一个worker容器,监听1234端口
9 |1 J6 P# f8 L: [2 E' H - $worker = new Worker('websocket://0.0.0.0:1234');
8 E) Y9 W, H: o- Y" _# t
: n+ d1 p) e# b G% O# T- /*
4 O, e2 x ~' m, H1 J! Q* t - * 注意这里进程数必须设置为1,否则会报端口占用错误& Q1 K, e y9 V& u! L- l
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
8 s/ P. d d& S' B - */. [ N* D: Y6 @
- $worker->count = 1;: D! F( n/ ^/ m0 i4 j2 q
- // worker进程启动后创建一个text Worker以便打开一个内部通讯端口& m% K: w1 o) w* C e3 T( |! C
- $worker->onWorkerStart = function($worker)% N0 j1 ]# c6 k; X5 q6 a$ l# _
- {
5 y0 s5 ~& \9 H( K - // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符* L' _+ {- a1 p6 W! [
- $inner_text_worker = new Worker('text://0.0.0.0:5678');# c. S0 m! o n6 D! J: Q1 H6 z
- $inner_text_worker->onMessage = function($connection, $buffer)- l0 g8 K) W0 A" A! j
- {5 `. Y8 r8 O( ^& C+ h; a
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据" |% @/ S1 e7 S& d- @7 W# m
- $data = json_decode($buffer, true); f# ?2 q5 o; U& @; m
- $uid = $data['uid'];
* `# o5 r5 Z4 f - // 通过workerman,向uid的页面推送数据
5 {" E) s5 o7 R1 ? P( S - $ret = sendMessageByUid($uid, $buffer);9 t) e' |9 o' R' ]1 R7 e4 t& n' A
- // 返回推送结果
5 U, A- j/ ~% {3 R2 z- e - $connection->send($ret ? 'ok' : 'fail');' S( m% P& Z+ a- m+ X6 o& V
- };
9 j" x, ~" M9 B9 X" K - // ## 执行监听 ##( }2 C" C- j" H/ n. `6 y. w/ T
- $inner_text_worker->listen();
9 W z# ?6 c' Y" H, p - };
, A$ g' ]/ Z- r - // 新增加一个属性,用来保存uid到connection的映射0 g/ X/ b8 h0 _. }5 O. S# |- }' n
- $worker->uidConnections = array();
* K6 g; O H" k. W( X: k D - // 当有客户端发来消息时执行的回调函数6 Z% Y7 q- L& N, u$ e% U
- $worker->onMessage = function($connection, $data)
. l9 w g+ F/ O# ~, Q( v. W. ]: U - {% L, q: J3 h5 D. @
- global $worker;: a% o: b( x' d+ `
- // 判断当前客户端是否已经验证,既是否设置了uid. b1 i3 ?; m/ y
- if(!isset($connection->uid)); Y: {" c4 K, h( n& i9 |- t
- { q0 g2 I, ~) z- r6 a% |
- // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
9 ^- Y! {4 q/ K% p! ^/ V - $connection->uid = $data;
H) B: D! f1 D* |1 H; v- T - /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,9 `/ Y1 B7 h4 [+ ^: y
- * 实现针对特定uid推送数据( O4 G( K& z9 I y; O( ^0 e: w
- */
, @+ p3 ]( I' P - $worker->uidConnections[$connection->uid] = $connection;
8 I6 D5 z0 O7 [( y - return;6 H: X, g/ l6 s; ^3 p3 Q
- }
; L3 P) G, s1 W) K4 j% y - };
% a4 F) O+ g: l9 ]8 K5 i
$ H5 a7 v/ W3 s7 p9 S4 s- // 当有客户端连接断开时. h0 ^( n: _6 v4 V5 D4 o( Q
- $worker->onClose = function($connection)
) V, } w* w( M0 n6 m - {1 B# R9 J O! w. @) u
- global $worker;
: }' T; x. x8 b4 v4 E8 i - if(isset($connection->uid))/ Q/ C8 l' a+ B& ?* E, v
- {( i) ^* j8 I I0 y/ v/ y$ f
- // 连接断开时删除映射
' T7 E* [% }* s$ J" ~0 X0 H( [ - unset($worker->uidConnections[$connection->uid]);$ O* |& N. |- Q: \4 F1 G6 x V
- }- j1 L b/ N) e P6 Q
- };
5 b3 }& e- ]9 U$ T
" ^( @8 z: t) i* X- // 向所有验证的用户推送数据
6 Z. \! q) D: S/ E {. g2 Q$ V - function broadcast($message)* F/ R5 ?# E/ x9 n0 F
- {3 g3 {6 i! g+ J: i
- global $worker;6 U3 _& _8 d3 _6 P! j! u) H, J
- foreach($worker->uidConnections as $connection)8 P5 T6 _3 h: W8 b0 ?. ?
- {
j5 H+ |& q+ r7 s - $connection->send($message);
4 {4 l n7 A+ | @5 g' G# n - }
5 y3 S$ `: \( z4 [5 g! } - }
0 T1 I1 W/ T6 ^8 n - & |) W3 q1 J4 N2 N& ~% A
- // 针对uid推送数据0 I0 }0 i6 ]9 `" u/ U
- function sendMessageByUid($uid, $message)- j' t! h# z$ j6 A! Z
- {
) @# D! `: q* {6 f" @0 H - global $worker;, ^) O) [) m& c; F- p2 ?/ V
- if(isset($worker->uidConnections[$uid]))
* G0 m) d# @( p* G' f9 E* q4 [ - {
3 y& ^# O% k) g0 ~ n/ K - $connection = $worker->uidConnections[$uid];
4 X6 }( e7 Y1 l& ^ V N - $connection->send($message);
+ ?% }) Z, a9 F! ~) U' t - return true;
: k4 B2 w4 t3 z7 N- L; } - }3 N8 W T- [* d3 g
- return false;$ ~" @ E: ?; W7 y% w0 I
- }
/ D" V7 {6 d$ ?: O
1 p, R( w; b: `9 f9 \/ v- // 运行所有的worker
& R2 [, B+ `) y' P1 k - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');& b$ ^. w# }2 x B1 D% l! {
- ws.onopen = function(){
8 V: v1 H+ l2 b. [0 w, d - var uid = 'uid1';1 H9 O/ i) U. x1 {# ~
- ws.send(uid);
7 a2 L; s v0 O6 |* R x - };& M( P7 N$ F) F- t5 G% y
- ws.onmessage = function(e){
/ I* @. {( c4 Z - alert(e.data);- d8 m, [1 x5 }! k! x
- };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口( w7 a m2 @9 J7 F+ E- i% O
- $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
( ~/ M: w$ K( }+ u9 U( Y - // 推送的数据,包含uid字段,表示是给这个uid推送% R$ T5 m- v+ G+ R. Z
- $data = array('uid'=>'uid1', 'percent'=>'88%');
% t% h+ Y) V* D; m* W4 r2 w - // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
- e" G0 H& W& n, W: B - fwrite($client, json_encode($data)."\n");6 E' N# I" S* g( d4 X0 l# I) h
- // 读取推送结果2 j! {, {2 ?5 l1 L* k) }# ^
- echo fread($client, 8192);
复制代码
5 w/ h! d U9 \/ R8 R' o
* e$ ?: H3 I7 o |