- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;
( `, V$ k- g/ @3 j6 \6 e$ ? - require_once __DIR__ . '/Workerman/Autoloader.php';
. I: Y. F# T; ~; P6 U3 R
' s+ @! G& \' d# J+ t+ I- $worker = new Worker();8 @- i& K3 `1 {# C! B
- // 4个进程# X# j0 Q9 F( R( \6 i' D2 `& K' O
- $worker->count = 4;% z* j( r' Q" y# C5 N) C
- // 每个进程启动后在当前进程新增一个Worker监听$ l! ]" B( X3 w& ]: G4 J
- $worker->onWorkerStart = function($worker)
5 D$ q- H h! N! ] - {1 Q+ V: @: ~; c. {. n) E
- /**$ d1 r( m% K% L: R2 L5 f
- * 4个进程启动的时候都创建2016端口的Worker
; f5 d8 Y7 U) k+ J - * 当执行到worker->listen()时会报Address already in use错误
: ^3 J; N5 D9 J6 D - * 如果worker->count=1则不会报错
% o& }/ a8 E& m+ O, K! T* j - */
- u# `( r% \" f# g/ } - $inner_worker = new Worker('http://0.0.0.0:2016');
$ ]* m _: U8 E, ]9 v8 G& O - $inner_worker->onMessage = 'on_message';
' a5 B) p! g( T7 `% d& t! k - // 执行监听。这里会报Address already in use错误: g/ W; Z; ~8 L' r( Y+ v
- $inner_worker->listen();& k3 i1 z$ L6 d6 s
- };, E. W; j6 n+ A% C
- 1 N8 b6 I1 O6 c9 t5 \0 o
- $worker->onMessage = 'on_message';% j# M" J) w1 s- W
- 2 q8 s* G# y$ G! E8 y$ c
- function on_message($connection, $data)
! K2 m$ C- J6 B - {
% Z5 v3 k8 }9 f8 s% E& {9 L5 j7 q - $connection->send("hello\n");9 h0 X& c+ O4 U2 a* W+ j& r& b) j
- }! a+ g: G6 G" E# P6 B0 [9 s" C
- ( s8 p5 H. w; v8 x
- // 运行worker
4 s" h# h9 X& m) |" U# c! C1 o, C" _ - Worker::runAll();
: Q6 v1 r! t' o0 z7 N; B - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:! |5 g3 E- X; S8 C! n9 K- F
- 9 E1 p6 I# A6 l# b
- use Workerman\Worker;
- L2 I7 U6 g* ]" S4 J- Q - require_once './Workerman/Autoloader.php';
* @- b* {6 J/ K5 \2 N' z6 @9 T
/ D" d$ g; ]. ~6 ~' a; c3 [ L- $worker = new Worker('text://0.0.0.0:2015');
& v! O O n/ ? - // 4个进程
- D5 F5 O& |1 B+ t( E% ^, n - $worker->count = 4;, c. m( d7 Z% K, s9 s: A& r
- // 每个进程启动后在当前进程新增一个Worker监听
. Z& R @. W/ \: b! x3 F% \( o - $worker->onWorkerStart = function($worker)
7 ?5 ~. Q" Q* [ T. [' {$ Z - {, i" N3 Q$ p# R, I: ^3 N
- $inner_worker = new Worker('http://0.0.0.0:2016');+ y3 Y( ?+ p1 w6 a s
- // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
3 Y( O( y! T2 u0 j& k. l, ^ - $inner_worker->reusePort = true;- j- R/ @( M7 u) n$ ]2 J
- $inner_worker->onMessage = 'on_message';% p0 W$ G0 C, J0 w6 a/ b& S8 T
- // 执行监听。正常监听不会报错# R9 P; U" V, g' D
- $inner_worker->listen();" o! s! p- Y1 X; O' Z3 \( j
- };+ j2 @ \3 |$ W! a9 Q
9 m) |+ U- |( r- $worker->onMessage = 'on_message';; k) ]4 F8 j3 V
- % F6 R3 S. |! f6 d# z
- function on_message($connection, $data)
3 S6 C9 s6 s# ?: n Z, t9 F& w - {
2 C3 I5 ~* f4 }& F; y' D+ t - $connection->send("hello\n");& [: ?: Q8 s+ { O2 K
- } O) e7 l/ V% s- a. s8 t; [
- ( u3 t8 {, z5 N2 s3 w
- // 运行worker
0 ]# W) X' K5 ]- S+ C5 {. } - Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php: B' a8 V$ l t4 Q( J8 [
- use Workerman\Worker;7 I1 X, h( w: P b; H$ p" _
- require_once './Workerman/Autoloader.php';
6 P( B0 C3 f# c/ ?, p1 |8 J - // 初始化一个worker容器,监听1234端口' E* K6 y( [/ B) G. W- h @
- $worker = new Worker('websocket://0.0.0.0:1234');, `) \; C R" j; R" t4 r
& Y, n8 ~$ H. `- /*
0 ]1 [. ?# o* a* u - * 注意这里进程数必须设置为1,否则会报端口占用错误: y' l3 }, a6 `% g0 K
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)% T9 R) x J$ K8 h S& s" G
- */
" v( U5 g: I- h/ f9 D, ] - $worker->count = 1;* n5 ]$ b7 _% ~: U3 n
- // worker进程启动后创建一个text Worker以便打开一个内部通讯端口; M, J: O& e5 r Y# w% o% x
- $worker->onWorkerStart = function($worker)( e/ o. O' Q6 x
- {
9 Z/ e) ~# E( D# l# e8 E - // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符: F4 ?' }. |1 e! w: _3 A
- $inner_text_worker = new Worker('text://0.0.0.0:5678');9 z8 R1 K) Z; z$ A( E3 r n% ~" g& w
- $inner_text_worker->onMessage = function($connection, $buffer)
/ X& \% C3 B( e3 y! ?- W8 a3 e+ o" R - {; y5 q0 a3 [% G1 H
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据1 L/ v1 |' e# }0 S$ B t7 t! b5 o
- $data = json_decode($buffer, true);6 U, g4 Z# N$ v1 ~/ B6 r; {0 Y
- $uid = $data['uid']; b# r$ k: J4 A$ W0 q8 L0 ~
- // 通过workerman,向uid的页面推送数据
4 P: F9 @8 Q7 C7 H5 D; A% A - $ret = sendMessageByUid($uid, $buffer);
7 Q: T# B6 r0 X# n ?! W - // 返回推送结果
& A6 k4 A6 g3 Z+ Q8 w, N! U3 Q - $connection->send($ret ? 'ok' : 'fail');7 y. y3 P v- ]! w4 U) w* d
- };0 z, d2 G+ n9 u% l3 |: Q" F8 ^
- // ## 执行监听 ##! B( r6 X( [" O
- $inner_text_worker->listen();
a7 c N; r I - };6 P6 z1 Y3 g2 y
- // 新增加一个属性,用来保存uid到connection的映射
( r: j/ W2 o3 _. m' | - $worker->uidConnections = array();
q0 o F3 K$ w# z% q) E - // 当有客户端发来消息时执行的回调函数
0 A8 k7 o% d% |; t" Y - $worker->onMessage = function($connection, $data); I3 m1 r1 F- N6 s% @6 g, n
- {, Y' o5 D) ~% f, E) t Y7 m
- global $worker;. i) w- s8 h+ F- L: ?" a5 b$ Y
- // 判断当前客户端是否已经验证,既是否设置了uid/ j f2 ^. ~) n" I4 D4 O: M. p
- if(!isset($connection->uid))* s1 w4 `- p9 D$ r
- {
" e: s9 E- p' c3 V7 @6 S! u. ^ - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
- f; X d. C+ Z, R: ?) t i - $connection->uid = $data;
" ^& x4 A# Q, k7 P" r8 v - /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
* `8 e7 O; L& @, T - * 实现针对特定uid推送数据
: b7 @6 W" |7 \3 S* v2 {6 L - */
, x. d3 x/ V- e( o( u - $worker->uidConnections[$connection->uid] = $connection;6 r, o6 U9 Q, _* u' M- y
- return;
2 n1 @8 Z; v3 h" p* `& O3 C - }
' Q. I7 Q5 w+ X2 O' \% k - };6 F' m/ ^7 U9 i
& H# A9 u9 a3 B3 p- // 当有客户端连接断开时, t; [, C( j* C- F. i
- $worker->onClose = function($connection)! J. f1 U6 C- I7 e( ~, @: m
- {
$ N: W6 c5 l9 W+ Z - global $worker;! }4 @4 t% V2 Z: |/ C, o. r
- if(isset($connection->uid)): \* {' R8 A7 c- T4 ?- M
- {0 k( |! U* l2 E4 u
- // 连接断开时删除映射
( i! z# J# W1 \ - unset($worker->uidConnections[$connection->uid]);
5 @. X2 ? t3 x' A$ U! h9 ? - }! i3 S" Q1 ~9 I+ A3 _0 t1 q
- };
4 F, ?; V4 _9 |$ n: w - % j5 z7 i' ~5 e3 M
- // 向所有验证的用户推送数据
g1 S, N/ c. `5 X) E1 r - function broadcast($message)* _ h0 a0 H1 {& D6 [, U3 `
- {$ t) |# h" U M% n
- global $worker;0 _; y4 G! a: k" _' i0 _' B8 G$ O( ^
- foreach($worker->uidConnections as $connection)
/ A0 N3 p/ ?" F4 V3 P8 v& F - {' z/ D* I( B; Q
- $connection->send($message);
: w8 d' E" {% j% I; c - }! a# l ^1 y* U/ ~
- }' Q. g: V7 L3 a' o9 f3 N5 h: t
' B$ Q) k5 M. H8 j3 t- // 针对uid推送数据/ m. T% ~8 y! ~6 G+ G
- function sendMessageByUid($uid, $message)
7 F/ `* z. }2 j1 U2 k2 f - {
2 Z$ T# @( [. p - global $worker;) U; u/ C3 F/ _9 ^0 n4 b
- if(isset($worker->uidConnections[$uid]))
! w1 E+ [0 M# p; ? - {/ q* h w- d& T* n/ G+ X2 {. ]9 t" j
- $connection = $worker->uidConnections[$uid];
7 x* C5 S" j, J J - $connection->send($message);2 s- ^1 Q! Q+ o9 X% `
- return true;
9 V0 J9 r. N |7 n! N3 L; D - }9 m; s0 S8 q( G
- return false;3 z) `) B1 D M- A- D
- }* N" h1 l. @! K" G* G$ t3 w
8 {: Y6 t/ a- [ D/ ~; p `7 S, i- // 运行所有的worker( \) B) {$ ~- _9 K- J, u
- Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');
7 ^# p$ Z0 b9 V7 n - ws.onopen = function(){
( @2 P5 f" X7 h2 Z! u- [2 } - var uid = 'uid1';
9 l/ |+ O f7 l8 W r- @6 k - ws.send(uid);) ]5 W( P) q2 p& B
- };
7 ^" r/ t+ ^. ~ - ws.onmessage = function(e){
; D) b0 H0 } m* @ - alert(e.data); B+ x0 S6 J2 o
- };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
1 e8 m+ Z. l& {, |6 B6 _& w1 a - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);8 Y7 u8 G9 w2 ?: R( X
- // 推送的数据,包含uid字段,表示是给这个uid推送4 e% n9 D' ~0 m, `1 R' {
- $data = array('uid'=>'uid1', 'percent'=>'88%');& B" X# z3 i/ h, P
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符+ U" E5 v8 b; \' J
- fwrite($client, json_encode($data)."\n");; L" ]( B# t: I% A* ]2 E) ]
- // 读取推送结果
0 |0 {, \ }9 W9 [ - echo fread($client, 8192);
复制代码 " i* _) m6 o+ A
( ?# ^2 a: H7 z
|