您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 10495|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;. A) ]8 F+ U7 W  }  b/ h
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    5 K4 m9 E3 f& |" z+ C/ S

  3. . q: x2 ?- Q3 J8 j8 {3 N
  4. $worker = new Worker();
    6 W0 Y5 R8 a3 f
  5. // 4个进程( E6 L0 V+ F" @# H3 k
  6. $worker->count = 4;
    % q' `7 `' i% W, P# b- t: G
  7. // 每个进程启动后在当前进程新增一个Worker监听
    2 U' A* c- ~& j. z9 X# s2 s0 _
  8. $worker->onWorkerStart = function($worker)5 {: g5 v$ P( V  H. f
  9. {) ]/ I7 f$ ?3 Q+ j' Y
  10.     /**3 m) V! j5 @* D* c3 @3 F5 s
  11.      * 4个进程启动的时候都创建2016端口的Worker7 q5 G9 W0 |4 j1 H# U2 L
  12.      * 当执行到worker->listen()时会报Address already in use错误* s8 Y4 j6 M+ _1 ?' q
  13.      * 如果worker->count=1则不会报错
    * W+ f  r) f9 U. }; L% c
  14.      */1 V, P9 `) ~2 F& _0 a
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');+ V" d4 M) p, t6 m* D( C$ H' N
  16.     $inner_worker->onMessage = 'on_message';
    4 z6 C6 k+ J* q
  17.     // 执行监听。这里会报Address already in use错误
    / u6 T# O4 n3 Q0 P
  18.     $inner_worker->listen();
    & }" U7 v, w+ j7 ?5 k
  19. };
    9 n! z. F/ ]: C  N* z  O+ X' T* f
  20. 7 b! _4 L1 `0 m1 z9 ]1 _
  21. $worker->onMessage = 'on_message';
    . m: E9 }' r5 W! X) I

  22. 6 j6 R/ T7 r9 U& \7 z) Z* }) A/ U
  23. function on_message($connection, $data)
      t' j! s4 _0 X6 I
  24. {
    . u9 i- `- @+ B, X
  25.     $connection->send("hello\n");
    ( Q# S. M: @7 M# c; Q
  26. }
    % `- C/ B2 |$ T, i

  27. - ]9 N4 o4 H1 Z) D' `
  28. // 运行worker
    . y" z) o3 W5 |" Z# `  `
  29. Worker::runAll();
    ; }. ~+ W# m, X" B. I
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:; P4 P' s9 ]6 b+ s! H% M/ ]

  31. ! }9 e, h( s; x6 N
  32. use Workerman\Worker;, d& ~! r5 [1 L" [# Q
  33. require_once './Workerman/Autoloader.php';  O, ^9 G8 i9 g. Q' g, T' H

  34. + y6 s  y6 D+ }% B
  35. $worker = new Worker('text://0.0.0.0:2015');
    % d+ j' K# P& |
  36. // 4个进程4 s; E. k6 h7 R0 \
  37. $worker->count = 4;6 Z; D, ]$ j- I) |! S4 m
  38. // 每个进程启动后在当前进程新增一个Worker监听
    6 X9 Z( l9 Q( t, y+ ^  t5 r
  39. $worker->onWorkerStart = function($worker), a2 E& C4 J& e7 v* E7 z- B
  40. {
    . d, Z: Z6 I: B" o( P
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    ( k, s, n2 \8 @9 O
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    0 g: u, }5 [4 X5 g% M5 r1 ~( K
  43.     $inner_worker->reusePort = true;
    3 R; F9 D9 g) x# E
  44.     $inner_worker->onMessage = 'on_message';: U+ n; r) ]/ y, L3 a
  45.     // 执行监听。正常监听不会报错
    & S! o5 D9 W' D9 j1 W' l3 T
  46.     $inner_worker->listen();* `: ^% x) S% z0 b% _
  47. };* `% q9 K: ]* Q* m' C
  48. + d+ t- V3 e& V
  49. $worker->onMessage = 'on_message';
    & l" f+ x6 ]4 Y, r# c0 p* o+ q! F

  50. 0 A4 u3 u( ]  t7 N7 s( H9 _
  51. function on_message($connection, $data)
    3 Z6 d( M& y1 {% \+ u# E
  52. {6 h" k" P$ N% \3 x6 a: \$ P
  53.     $connection->send("hello\n");  }3 r0 r# j+ P, |$ ]: U, F8 i
  54. }0 h' j+ o5 E; Q3 `  A) U. @
  55. ' G& P, K4 N# _' V
  56. // 运行worker( n( K. y; F9 R# `) M
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    ) h1 T: b6 S  N7 x
  2. use Workerman\Worker;
    # J  h, }  z# ?2 E4 q. l  P' p
  3. require_once './Workerman/Autoloader.php';
    . c/ Y3 S! l& t9 I7 t  c
  4. // 初始化一个worker容器,监听1234端口6 u% ~$ v" m. G/ x
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    7 L* t1 g. f0 }5 w' h

  6. ; t/ t* e) Z* F/ n; A1 }
  7. /*
    . n  q$ a3 @" H6 t  O& x! Q1 f
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误7 H% V; p4 @* t
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    3 Q7 ^9 i4 j, W5 T( I$ Q( h
  10. */
    2 a1 j) j  F4 I
  11. $worker->count = 1;* z8 R# N; A: \4 H6 }5 B- p' q
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口$ C9 m/ [! Y7 c; y' B, F1 z
  13. $worker->onWorkerStart = function($worker)
    0 y$ D* a4 L- _
  14. {  [: c. B7 q- t" B% N; f
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符: z/ S9 l' T# K9 [' D
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    , z& a3 @1 G% n' i$ p" |
  17.     $inner_text_worker->onMessage = function($connection, $buffer)( x7 u$ P* Y; d2 o8 ^
  18.     {# I) E# G0 p. N: w5 F
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据: i* q, p* n1 d0 b  G, m
  20.         $data = json_decode($buffer, true);! ~8 U2 q& k( ]# f+ }
  21.         $uid = $data['uid'];; g, D5 _7 w( L  i7 C% U
  22.         // 通过workerman,向uid的页面推送数据" K  P' ?7 w2 t$ }
  23.         $ret = sendMessageByUid($uid, $buffer);
    6 M) h" [/ s4 {+ [5 v- L# C1 d
  24.         // 返回推送结果# D: V! J) u; w' B& N7 D
  25.         $connection->send($ret ? 'ok' : 'fail');; C9 p( u- w: I7 J9 I- c5 V
  26.     };7 O! h+ q0 y( ]% q
  27.     // ## 执行监听 ##* F5 j' j) e' s+ F* ?9 B9 C
  28.     $inner_text_worker->listen();5 N% E' x9 w) |2 _6 w& Z0 r
  29. };
    # f, q8 Q5 ~& ~( h/ \4 A# K
  30. // 新增加一个属性,用来保存uid到connection的映射
    - _) [& E! B& k" Y
  31. $worker->uidConnections = array();( k- i6 O3 c: o4 O
  32. // 当有客户端发来消息时执行的回调函数( e" M. X* N- Y+ h8 U+ g& Y0 Y
  33. $worker->onMessage = function($connection, $data)6 m2 S. `5 U/ ~. t, q
  34. {& C' s# j1 T( }1 O" E
  35.     global $worker;
    0 {7 \# @9 z0 N# o  y
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    " ?' I* z% P0 y* k" `% i/ w) k
  37.     if(!isset($connection->uid))) o' T+ |7 [9 k2 @+ r- q; i! d
  38.     {
    & P- x) m6 Q1 E0 u2 x
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证), B$ x' L' T# y! b
  40.        $connection->uid = $data;: y& ]6 n9 ]+ b; I
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    - `; J* k; i0 q' Z8 g8 I2 b$ I" {. N
  42.         * 实现针对特定uid推送数据
    ; M8 Z( \- o0 c; Q; k2 m3 [
  43.         */
    : u% r0 \( N  D8 a  L0 P! X+ F0 Q8 r
  44.        $worker->uidConnections[$connection->uid] = $connection;
    * X! Z# Q& C" ^+ y& ?. \
  45.        return;
    4 _0 n( h+ K+ t9 w0 E* O
  46.     }2 ^# K6 V) L) s
  47. };
    % l1 B! i2 z* r) N1 h, g' i
  48. " t- n- W2 e: i4 O& o2 e
  49. // 当有客户端连接断开时
    4 O& ?' E/ g, X4 b9 B
  50. $worker->onClose = function($connection)6 j* i/ C+ [  z- T8 d7 i8 R
  51. {/ A0 a% w+ [% R+ f( k
  52.     global $worker;  \, |1 D& Z1 z9 q- @* \/ ^
  53.     if(isset($connection->uid))& ?# P# G$ q3 E4 E* y/ P. W( F+ a
  54.     {) c: W/ u, S$ {/ `
  55.         // 连接断开时删除映射2 j2 k% z! D1 S5 \( g( u5 E
  56.         unset($worker->uidConnections[$connection->uid]);
    ) Y3 p' v# r2 Y( F( w4 M; D
  57.     }
    6 _, w# V! W; V$ _8 M
  58. };
    9 z5 g& {( {  d, c1 z" C5 n/ x
  59. 8 ]' d/ }, p# E! v
  60. // 向所有验证的用户推送数据& u4 g8 O) y: F9 T1 S* V; [( D
  61. function broadcast($message)
    / r' X$ U4 o; S+ r; V
  62. {2 L  o' n; l7 j# Y* V+ {" K
  63.    global $worker;- v: O: t" k4 W# A% f* g
  64.    foreach($worker->uidConnections as $connection)
    - _! U" X  t* \/ U1 b+ L
  65.    {
    9 p5 t7 x8 `4 F8 o1 ~
  66.         $connection->send($message);
    , b, k5 i" p$ W; {  M) ~- b( L
  67.    }
    , V# a8 P3 m8 x% x( f
  68. }
    $ y+ F8 |2 k1 i/ c* @- _; j
  69. 0 s. B& ~) c- k) V6 F
  70. // 针对uid推送数据  ^: U$ N% p- Z. R
  71. function sendMessageByUid($uid, $message)
      o+ ?; s/ u3 d' _# Y6 C) T9 _
  72. {
      I4 G  @: ~) t- V. ^  c7 J6 z
  73.     global $worker;( B: Y; C1 {/ L3 d& y; }
  74.     if(isset($worker->uidConnections[$uid]))7 |0 r/ u! M. \: z
  75.     {
    ! A: j% x* R9 |9 Z# t2 E* F6 {
  76.         $connection = $worker->uidConnections[$uid];
    9 G0 |& K( g1 z. ]
  77.         $connection->send($message);
    4 a% U* u* F! l4 u; h- A9 h
  78.         return true;
    8 A/ Z* f3 H. ~# {- K6 l
  79.     }( n& y2 i  e* s
  80.     return false;" E) J+ I* |$ [+ }8 u3 o$ ]# y
  81. }
    - `9 H8 [+ Y( F+ f

  82. 8 A% @; |! ~8 O; w) U
  83. // 运行所有的worker
    5 S: Y5 M8 v, a
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    4 J0 j4 `4 s8 a) T9 f
  2. ws.onopen = function(){
    " }" b9 o- K2 u( b
  3.     var uid = 'uid1';# k% R. o. g. a0 D: m! o! p4 I
  4.     ws.send(uid);" @% {- B* P5 A9 C, \
  5. };( D, q* r0 O5 s1 k" g8 l
  6. ws.onmessage = function(e){- Y4 M9 L0 r- e) U
  7.     alert(e.data);, y* y. V# v5 l9 m+ E( H
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    & @; z; p% U" P9 O2 q" l
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);5 o" Z/ [: X0 X$ @% d' Q
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    $ S3 ]3 w$ q# q1 Z& I' \: ^
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');0 S* s* G, m; U  j- D, j+ J) w
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    5 Q* U$ f6 }% c) G
  6. fwrite($client, json_encode($data)."\n");
    " ^  z) |5 J$ M# g6 u' M
  7. // 读取推送结果/ X' M0 [7 G2 q' ?' a
  8. echo fread($client, 8192);
复制代码

  u% U" q$ @: s& \
( s. M4 _* ~& f; Q/ p! b9 U0 W$ J
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-5-17 20:24 , Processed in 0.119143 second(s), 19 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!