- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;/ `* u. @( g6 V
- require_once __DIR__ . '/Workerman/Autoloader.php';# L: y6 U0 @7 v0 |: ^- `2 T! o
- 3 G8 l) d. f& U7 L
- $worker = new Worker();
) K) r5 A$ d1 _" i2 z* H$ G - // 4个进程
' _* {. M4 L0 }8 ^9 ] - $worker->count = 4;& q: S0 h# c* ?; A
- // 每个进程启动后在当前进程新增一个Worker监听
: p% c- ?9 O M- n3 t1 S - $worker->onWorkerStart = function($worker)
. D4 ?- v/ \/ U- y - {2 u0 W! y& i2 X' T
- /**
6 V$ ?7 R- X* v8 | - * 4个进程启动的时候都创建2016端口的Worker; u' t: _4 @; X. O0 A
- * 当执行到worker->listen()时会报Address already in use错误
8 y0 S( K/ v4 a - * 如果worker->count=1则不会报错$ `+ N$ d, q0 G3 f3 q2 ~ N2 Y- Y
- */
+ |. o5 l4 J$ M+ O3 w8 s, G7 o - $inner_worker = new Worker('http://0.0.0.0:2016');0 c0 @! P3 m* f
- $inner_worker->onMessage = 'on_message';
3 K! }$ [' z4 R6 `# R% R. d o$ y - // 执行监听。这里会报Address already in use错误
$ f& H) y1 m! K# T: i$ V7 h8 y - $inner_worker->listen();* Y" n6 |7 z7 l4 B6 J- G
- };
, Y N+ l. b0 Q% g7 ?+ A - I& z k+ R' ?3 J7 m0 h( e$ C
- $worker->onMessage = 'on_message'; D9 n8 O3 {8 S, E1 Y+ F* z
- ( q8 J# `. `3 ?, R
- function on_message($connection, $data)" P( @: r' C5 G9 L8 s1 N
- {9 F: F0 G( B5 W l% w. w; l( z7 v
- $connection->send("hello\n");
- v' g! ]) U# a) o - }1 v. ?+ k+ ~2 S! I; Y/ k# A
- # f2 X1 h {5 M2 |5 Z" _
- // 运行worker
& z0 p* g& y) z2 @6 r - Worker::runAll();
$ q% l$ D+ a! h. b" w$ o1 l - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
$ M' _; K- p x% ?2 E5 c
7 L( H: }; l6 r. @- use Workerman\Worker;; b: T9 w* f: l7 j( X
- require_once './Workerman/Autoloader.php';9 @! _8 ]9 ^" M( \6 g
( f0 G# o. V3 h0 {+ P- $worker = new Worker('text://0.0.0.0:2015');
1 N/ A* Q: e& I" v* T7 u - // 4个进程
! H1 Z% [6 B1 t" ` - $worker->count = 4;/ q" ~' ^) x! G/ B, C+ B
- // 每个进程启动后在当前进程新增一个Worker监听
0 a! k h: r$ f4 ^: K$ p - $worker->onWorkerStart = function($worker)5 a9 Q- _2 ]2 p4 B" P# R+ h
- {' F" T0 G, m8 }$ H; v6 P5 X
- $inner_worker = new Worker('http://0.0.0.0:2016');6 x& z, K( N! }9 p* u w0 t
- // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
* E! c1 d: J- q/ I7 h- L( x) p) W - $inner_worker->reusePort = true;6 ^7 A7 R) s! @8 a9 U; ~
- $inner_worker->onMessage = 'on_message';
3 S1 K/ g; T- w& B4 x6 c, F0 i: f - // 执行监听。正常监听不会报错
' ^# N/ p+ g; n; N' B - $inner_worker->listen();
7 t. `& d3 a$ c; }5 ` }2 E" M - };0 ?. \/ `4 J S- Y& u3 o! z
7 d1 x3 Q1 B* r; T Q: H- $worker->onMessage = 'on_message';
+ e& i. H5 B& ^1 |3 Y9 g3 Y- ] - 2 S$ d( U- V0 y* O
- function on_message($connection, $data)* Z& ?0 n {! n% A+ Q8 F
- {
# j( H5 A# H5 R' Z& `5 M - $connection->send("hello\n");! x) ~, L0 T/ P& e0 ~9 ^
- }! ^6 }* L9 z, Q3 h; P3 M- q
- ; V/ q1 _ F* ~- V
- // 运行worker
# a; l) j- N7 D7 k' x" ? - Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php. S/ X5 \- O0 i `% o% G8 A7 m7 f
- use Workerman\Worker;/ k1 F$ X# w5 b+ a
- require_once './Workerman/Autoloader.php';4 R. W% q& [$ t& T! n' d
- // 初始化一个worker容器,监听1234端口
9 Y% w- P1 W7 D0 z - $worker = new Worker('websocket://0.0.0.0:1234');
+ z, F. P6 Q2 u* u+ ^7 W
' c* `8 O: z! s: s- /*
# ]4 z# F7 P( h2 y - * 注意这里进程数必须设置为1,否则会报端口占用错误8 b' m1 d9 P$ c7 I! ~/ T
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
9 B. v8 H `7 O0 C3 O - */
2 t9 ?4 n) h4 t v - $worker->count = 1;5 S; r" }- i4 O; g
- // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
9 ?: X W1 f+ W3 q/ q% H - $worker->onWorkerStart = function($worker)
6 C6 j8 {& @9 @2 B* \3 t7 S& V - {$ C @+ V8 G O9 }
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符+ i; d( z$ m' U& W
- $inner_text_worker = new Worker('text://0.0.0.0:5678');
9 l5 C k$ z$ U - $inner_text_worker->onMessage = function($connection, $buffer)( q9 r* f* R% p$ A
- {1 O; l/ J% x; Q0 e# v$ h
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据
$ r5 U# F/ n3 b' h8 N2 k4 m - $data = json_decode($buffer, true);
- C2 X- d: p1 ~! j( f+ T - $uid = $data['uid'];+ L1 P% y$ }' n: r; K
- // 通过workerman,向uid的页面推送数据2 A5 X2 y, v0 R$ P( j# Z
- $ret = sendMessageByUid($uid, $buffer);
' G% L) J. s0 s# |& B* M - // 返回推送结果* _& e4 j! r1 m/ ^ m# W
- $connection->send($ret ? 'ok' : 'fail');
5 u6 M- [) l R9 ^2 V# g - };( H7 ?' N$ S! A* l5 W+ X5 w# |) P
- // ## 执行监听 ##
% ^0 j6 W# n X5 `7 z - $inner_text_worker->listen();
' ~& [9 r/ h; g! G9 P - };4 g: k! J, S# ]8 g. s0 o. e
- // 新增加一个属性,用来保存uid到connection的映射
5 y8 f3 O8 b0 A8 T: f1 f& q - $worker->uidConnections = array();
% z3 Y! B R( ` - // 当有客户端发来消息时执行的回调函数
8 w. s$ a [- E P7 j% h6 L - $worker->onMessage = function($connection, $data)
* F9 ^& S Q6 k8 j; p1 m& g - {
. } `( C+ S9 m: \0 t& I( h - global $worker;& h5 Y/ d {; r2 h: z. F( r
- // 判断当前客户端是否已经验证,既是否设置了uid
# n) V" H9 M3 K! S8 o# W1 s" a - if(!isset($connection->uid)). _. L/ N% F# `5 u6 o9 K3 {
- {
) L, z5 F; s& F' u5 Z/ |0 V# Z% N - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
4 R+ ~0 z9 J& J - $connection->uid = $data;
- k$ ]- O& M; ?7 ?3 s* v' r - /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,4 a4 J0 k; k8 C- Y$ o- e/ \ r
- * 实现针对特定uid推送数据: s" l2 d9 x1 F
- */
2 l( B) [( C* y, H1 d5 R9 O7 N - $worker->uidConnections[$connection->uid] = $connection;% e8 g- F2 N' ~2 L/ Z! Y; I% C
- return;/ A1 ^* l+ o( X5 O
- }$ @. r' L6 _$ o0 v( [9 m' H& w
- };
) k2 e- G5 N4 Y$ R
3 u2 E s1 a, ~3 E- // 当有客户端连接断开时0 w1 V5 D1 D P& s; b$ e3 X
- $worker->onClose = function($connection); o2 f1 c4 ~6 a5 a
- {9 U) _: g+ Q, |( O
- global $worker;
) w$ _; i9 y0 B4 D( H, h7 p; h - if(isset($connection->uid))
$ F/ {& t* J5 |+ V# K( e - {' c5 ]8 i' \* h; v8 ?) u
- // 连接断开时删除映射4 z6 C+ R) Y, X4 N6 T% L
- unset($worker->uidConnections[$connection->uid]);
0 ~" Z' N9 b. ?4 e8 `1 x1 `9 _ - }
+ n+ L3 S( O q) @. S& H( e - };4 N! f2 Q; [3 I, P- I- H0 J' r
- " D' t/ s8 S' h# V
- // 向所有验证的用户推送数据
A+ f2 r) s& E* D# m7 b2 c2 v# \ - function broadcast($message)
0 r; U: O; ]+ R) D7 m6 P - {3 I$ w! E$ _# I; X3 m
- global $worker;
2 w* X7 i# o+ _% M - foreach($worker->uidConnections as $connection)
9 r, D9 k) u* v t+ p) m6 @ - {1 G7 J0 f$ n) p+ l
- $connection->send($message);
( M9 P* L! e1 Y m/ {6 T# R8 F - }
# ^1 B N, Z5 d - }
& `" w1 m3 }0 R - & R1 {- X+ |& M: W* k' U, ?: D/ d& B
- // 针对uid推送数据
6 w% {& {4 [; I% ^5 X- i9 C - function sendMessageByUid($uid, $message)
y) {7 B, ?: o$ N - {
# W3 P* n0 B9 J; ^2 m, l7 v - global $worker;
! r) M) T3 M2 t) E) E& S - if(isset($worker->uidConnections[$uid]))
- z' Z. o* { ^# O7 z3 @6 E - {" e# z" E6 h& B
- $connection = $worker->uidConnections[$uid];
/ X7 s! J' r0 t# Y9 t - $connection->send($message);
4 m) u, M% Y) j, R7 w - return true;
1 X' |5 I$ K0 ^5 |" {, v6 q - }
0 w: c+ b$ b7 j1 h, Z6 R1 N) K* |" G - return false; f2 c/ l# v3 H5 Z% e# D. H" W( [
- }: K* A0 X7 y/ c8 t/ F
3 a4 I. K* k9 w4 {6 u- // 运行所有的worker/ Q* y0 m- v& [" {" f/ C
- Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');) H; c6 l1 _5 M/ B% U8 I
- ws.onopen = function(){5 N0 }) i- S$ ]
- var uid = 'uid1';
, X# Z a$ ]7 Q( i: B - ws.send(uid);
/ a4 M: i, F( E3 F# N7 ?* j - };8 G. s+ W( H; |
- ws.onmessage = function(e){' l# `# M. _$ q, ^- D7 X
- alert(e.data);
. D; c4 \( H: S/ C; a# N - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
1 X7 z" ^; M' g. v9 ]1 E - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);! ~* v2 c4 b1 d; ^- E' h# G
- // 推送的数据,包含uid字段,表示是给这个uid推送
+ o+ ^6 L' r' i( a+ ~ - $data = array('uid'=>'uid1', 'percent'=>'88%');/ d' ]' v+ m) v# x4 u
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符8 J/ ~; O" G, j% f& B
- fwrite($client, json_encode($data)."\n");
+ B S' F {8 u* W* } - // 读取推送结果
: W2 W: z& T& R5 K$ h0 ` - echo fread($client, 8192);
复制代码
" @9 _6 ^. i/ Y' q
& L" E W% T* m4 u Y; X! ` |